Repository: incubator-zeppelin Updated Branches: refs/heads/master 215447693 -> 8cde5c9bd
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java new file mode 100644 index 0000000..8a85c84 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler; + +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Routine that sends PING to all connected Zeppelin ws connections. + * + */ +public class ZeppelinHeartbeat implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubHeartbeat.class); + private ZeppelinClient client; + + public static ZeppelinHeartbeat newInstance(ZeppelinClient client) { + return new ZeppelinHeartbeat(client); + } + + private ZeppelinHeartbeat(ZeppelinClient client) { + this.client = client; + } + + @Override + public void run() { + LOG.debug("Sending PING to all connected Zeppelin notes"); + client.pingAllNotes(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java new file mode 100644 index 0000000..f982365 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHubHeartbeat.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler; + +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient; +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Routine that send PING event to zeppelinhub. + * + */ +public class ZeppelinHubHeartbeat implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubHeartbeat.class); + private ZeppelinhubClient client; + + public static ZeppelinHubHeartbeat newInstance(ZeppelinhubClient client) { + return new ZeppelinHubHeartbeat(client); + } + + private ZeppelinHubHeartbeat(ZeppelinhubClient client) { + this.client = client; + } + + @Override + public void run() { + LOG.debug("Sending PING to zeppelinhub"); + client.send(ZeppelinhubUtils.pingMessage(client.getToken())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java new file mode 100644 index 0000000..86cd4ad --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/session/ZeppelinhubSession.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.jetty.websocket.api.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Zeppelinhub session. + */ +public class ZeppelinhubSession { + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubSession.class); + private Session session; + private final String token; + + public static final ZeppelinhubSession EMPTY = new ZeppelinhubSession(null, StringUtils.EMPTY); + + public static ZeppelinhubSession createInstance(Session session, String token) { + return new ZeppelinhubSession(session, token); + } + + private ZeppelinhubSession(Session session, String token) { + this.session = session; + this.token = token; + } + + public boolean isSessionOpen() { + return ((session != null) && (session.isOpen())); + } + + public void close() { + if (isSessionOpen()) { + session.close(); + } + } + + public void sendByFuture(String msg) { + if (StringUtils.isBlank(msg)) { + LOG.error("Cannot send event to Zeppelinhub, msg is empty"); + } + if (isSessionOpen()) { + session.getRemote().sendStringByFuture(msg); + } else { + LOG.error("Cannot send event to Zeppelinhub, session is close"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java new file mode 100644 index 0000000..c13abe5 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/utils/ZeppelinhubUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils; + +import java.util.HashMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp; +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage; +import org.apache.zeppelin.notebook.socket.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class. + * + */ +public class ZeppelinhubUtils { + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubUtils.class); + + public static String liveMessage(String token) { + if (StringUtils.isBlank(token)) { + LOG.error("Cannot create Live message: token is null or empty"); + return ZeppelinhubMessage.EMPTY.serialize(); + } + HashMap<String, Object> data = new HashMap<String, Object>(); + data.put("token", token); + return ZeppelinhubMessage + .newMessage(ZeppelinHubOp.LIVE, data, new HashMap<String, String>()) + .serialize(); + } + + public static String deadMessage(String token) { + if (StringUtils.isBlank(token)) { + LOG.error("Cannot create Dead message: token is null or empty"); + return ZeppelinhubMessage.EMPTY.serialize(); + } + HashMap<String, Object> data = new HashMap<String, Object>(); + data.put("token", token); + return ZeppelinhubMessage + .newMessage(ZeppelinHubOp.DEAD, data, new HashMap<String, String>()) + .serialize(); + } + + public static String pingMessage(String token) { + if (StringUtils.isBlank(token)) { + LOG.error("Cannot create Ping message: token is null or empty"); + return ZeppelinhubMessage.EMPTY.serialize(); + } + HashMap<String, Object> data = new HashMap<String, Object>(); + data.put("token", token); + return ZeppelinhubMessage + .newMessage(ZeppelinHubOp.PING, data, new HashMap<String, String>()) + .serialize(); + } + + public static ZeppelinHubOp toZeppelinHubOp(String text) { + ZeppelinHubOp hubOp = null; + try { + hubOp = ZeppelinHubOp.valueOf(text); + } catch (IllegalArgumentException e) { + // in case of non Hub op + } + return hubOp; + } + + public static boolean isZeppelinHubOp(String text) { + return (toZeppelinHubOp(text) != null); + } + + public static Message.OP toZeppelinOp(String text) { + Message.OP zeppelinOp = null; + try { + zeppelinOp = Message.OP.valueOf(text); + } catch (IllegalArgumentException e) { + // in case of non Hub op + } + return zeppelinOp; + } + + public static boolean isZeppelinOp(String text) { + return (toZeppelinOp(text) != null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java new file mode 100644 index 0000000..a3fc048 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.socket; + +import java.util.HashMap; +import java.util.Map; + +/** + * Zeppelin websocker massage template class. + */ +public class Message { + /** + * Representation of event type. + */ + public static enum OP { + GET_HOME_NOTE, // [c-s] load note for home screen + + GET_NOTE, // [c-s] client load note + // @param id note id + + NOTE, // [s-c] note info + // @param note serlialized Note object + + PARAGRAPH, // [s-c] paragraph info + // @param paragraph serialized paragraph object + + PROGRESS, // [s-c] progress update + // @param id paragraph id + // @param progress percentage progress + + NEW_NOTE, // [c-s] create new notebook + DEL_NOTE, // [c-s] delete notebook + // @param id note id + CLONE_NOTE, // [c-s] clone new notebook + // @param id id of note to clone + // @param name name fpor the cloned note + IMPORT_NOTE, // [c-s] import notebook + // @param object notebook + NOTE_UPDATE, + + RUN_PARAGRAPH, // [c-s] run paragraph + // @param id paragraph id + // @param paragraph paragraph content.ie. script + // @param config paragraph config + // @param params paragraph params + + COMMIT_PARAGRAPH, // [c-s] commit paragraph + // @param id paragraph id + // @param title paragraph title + // @param paragraph paragraph content.ie. script + // @param config paragraph config + // @param params paragraph params + + CANCEL_PARAGRAPH, // [c-s] cancel paragraph run + // @param id paragraph id + + MOVE_PARAGRAPH, // [c-s] move paragraph order + // @param id paragraph id + // @param index index the paragraph want to go + + INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph + // @param target index + + COMPLETION, // [c-s] ask completion candidates + // @param id + // @param buf current code + // @param cursor cursor position in code + + COMPLETION_LIST, // [s-c] send back completion candidates list + // @param id + // @param completions list of string + + LIST_NOTES, // [c-s] ask list of note + RELOAD_NOTES_FROM_REPO, // [c-s] reload notes from repo + + NOTES_INFO, // [s-c] list of note infos + // @param notes serialized List<NoteInfo> object + + PARAGRAPH_REMOVE, + PARAGRAPH_CLEAR_OUTPUT, + PARAGRAPH_APPEND_OUTPUT, // [s-c] append output + PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output + PING, + AUTH_INFO, + + ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object + ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del + + ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated, + + ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object + + ANGULAR_OBJECT_CLIENT_UNBIND, // [c-s] angular object unbind from AngularJS z object + + LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations + CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations + // @param settings serialized Map<String, String> object + + CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository + // @param noteId + // @param checkpointName + + } + + public OP op; + public Map<String, Object> data = new HashMap<String, Object>(); + public String ticket = "anonymous"; + public String principal = "anonymous"; + public String roles = ""; + + public Message(OP op) { + this.op = op; + } + + public Message put(String k, Object v) { + data.put(k, v); + return this; + } + + public Object get(String k) { + return data.get(k); + } + + public <T> T getType(String key) { + return (T) data.get(key); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Message{"); + sb.append("data=").append(data); + sb.append(", op=").append(op); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java new file mode 100644 index 0000000..1e83354 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java @@ -0,0 +1,150 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.httpclient.HttpException; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Files; + + +public class ZeppelinHubRepoTest { + final String TOKEN = "AAA-BBB-CCC-00"; + final String testAddr = "http://zeppelinhub.ltd"; + + private ZeppelinHubRepo repo; + private File pathOfNotebooks = new File(System.getProperty("user.dir") + "/src/test/resources/list_of_notes"); + private File pathOfNotebook = new File(System.getProperty("user.dir") + "/src/test/resources/note"); + + @Before + public void setUp() throws Exception { + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr); + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, "AAA-BBB-CCC-00"); + + ZeppelinConfiguration conf = new ZeppelinConfiguration(); + repo = new ZeppelinHubRepo(conf); + repo.setZeppelinhubRestApiHandler(getMockedZeppelinHandler()); + } + + private ZeppelinhubRestApiHandler getMockedZeppelinHandler() throws HttpException, IOException { + ZeppelinhubRestApiHandler mockedZeppelinhubHandler = mock(ZeppelinhubRestApiHandler.class); + + byte[] response = Files.toByteArray(pathOfNotebooks); + when(mockedZeppelinhubHandler.asyncGet("")).thenReturn(new String(response)); + + response = Files.toByteArray(pathOfNotebook); + when(mockedZeppelinhubHandler.asyncGet("AAAAA")).thenReturn(new String(response)); + + return mockedZeppelinhubHandler; + } + + @Test + public void testGetZeppelinhubUrl() { + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr); + + ZeppelinConfiguration config = new ZeppelinConfiguration(); + ZeppelinHubRepo repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("http://zeppelinhub.ltd"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "yolow"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("https://www.zeppelinhub.com"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://zeppelinhub.ltd:4242"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("http://zeppelinhub.ltd:4242"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://zeppelinhub.ltd:0"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinHubUrl(config)).isEqualTo("http://zeppelinhub.ltd"); + } + + @Test + public void testGetZeppelinHubWsEndpoint() { + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr); + + ZeppelinConfiguration config = new ZeppelinConfiguration(); + ZeppelinHubRepo repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("ws://zeppelinhub.ltd:80/async"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "https://zeppelinhub.ltd"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://zeppelinhub.ltd:443/async"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "yolow"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://www.zeppelinhub.com:443/async"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://zeppelinhub.ltd:4242"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("ws://zeppelinhub.ltd:4242/async"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "https://www.zeppelinhub.com"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://www.zeppelinhub.com:443/async"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://www.zeppelinhub.com"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("ws://www.zeppelinhub.com:80/async"); + + System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "https://www.zeppelinhub.com:4242"); + + config = new ZeppelinConfiguration(); + repository = new ZeppelinHubRepo(config); + assertThat(repository.getZeppelinhubWebsocketUri(config)).isEqualTo("wss://www.zeppelinhub.com:4242/async"); + } + + @Test + public void testGetAllNotes() throws IOException { + List<NoteInfo> notebooks = repo.list(); + assertThat(notebooks).isNotEmpty(); + assertThat(notebooks.size()).isEqualTo(3); + } + + @Test + public void testGetNote() throws IOException { + Note notebook = repo.get("AAAAA"); + assertThat(notebook).isNotNull(); + assertThat(notebook.id()).isEqualTo("2A94M5J1Z"); + } + + @Test + public void testRemoveNote() throws IOException { + // not suppose to throw + repo.remove("AAAAA"); + } + + @Test + public void testRemoveNoteError() throws IOException { + // not suppose to throw + repo.remove("BBBBB"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java new file mode 100644 index 0000000..b2cc81c --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java @@ -0,0 +1,123 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket; + +import static org.junit.Assert.*; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock.MockEchoWebsocketServer; +import org.apache.zeppelin.notebook.socket.Message; +import org.apache.zeppelin.notebook.socket.Message.OP; +import org.eclipse.jetty.websocket.api.Session; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class ZeppelinClientTest { + private Logger LOG = LoggerFactory.getLogger(ZeppelinClientTest.class); + private final int zeppelinPort = 8080; + private final String validWebsocketUrl = "ws://localhost:" + zeppelinPort + "/ws"; + private ExecutorService executor; + private MockEchoWebsocketServer echoServer; + + @Before + public void setUp() throws Exception { + startWebsocketServer(); + } + + @After + public void tearDown() throws Exception { + //tear down routine + echoServer.stop(); + executor.shutdown(); + } + + private void startWebsocketServer() throws InterruptedException { + // mock zeppelin websocket server setup + executor = Executors.newFixedThreadPool(1); + echoServer = new MockEchoWebsocketServer(zeppelinPort); + executor.submit(echoServer); + } + + @Test + public void zeppelinConnectionTest() { + try { + // Wait for websocket server to start + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.warn("Cannot wait for websocket server to start, returning"); + return; + } + // Initialize and start Zeppelin client + ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token", null); + client.start(); + LOG.info("Zeppelin websocket client started"); + + // Connection to note AAAA + Session connectionA = client.getZeppelinConnection("AAAA"); + assertNotNull(connectionA); + assertTrue(connectionA.isOpen()); + + assertEquals(client.countConnectedNotes(), 1); + assertEquals(connectionA, client.getZeppelinConnection("AAAA")); + + // Connection to note BBBB + Session connectionB = client.getZeppelinConnection("BBBB"); + assertNotNull(connectionB); + assertTrue(connectionB.isOpen()); + + assertEquals(client.countConnectedNotes(), 2); + assertEquals(connectionB, client.getZeppelinConnection("BBBB")); + + // Remove connection to note AAAA + client.removeZeppelinConnection("AAAA"); + assertEquals(client.countConnectedNotes(), 1); + assertNotEquals(connectionA, client.getZeppelinConnection("AAAA")); + assertEquals(client.countConnectedNotes(), 2); + client.stop(); + } + + @Test + public void zeppelinClientSingletonTest() { + ZeppelinClient client1 = ZeppelinClient.getInstance(); + if (client1 == null) { + client1 = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null); + } + assertNotNull(client1); + ZeppelinClient client2 = ZeppelinClient.getInstance(); + assertNotNull(client2); + assertEquals(client1, client2); + } + + @Test + public void zeppelinMessageSerializationTest() { + Message msg = new Message(OP.LIST_NOTES); + msg.data = Maps.newHashMap(); + msg.data.put("key", "value"); + ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null); + String serializedMsg = client.serialize(msg); + Message deserializedMsg = client.deserialize(serializedMsg); + assertEquals(msg.op, deserializedMsg.op); + assertEquals(msg.data.get("key"), deserializedMsg.data.get("key")); + + String invalidMsg = "random text"; + deserializedMsg =client.deserialize(invalidMsg); + assertNull(deserializedMsg); + } + + @Test + public void sendToZeppelinTest() { + ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null); + client.start(); + Message msg = new Message(OP.LIST_NOTES); + msg.data = Maps.newHashMap(); + msg.data.put("key", "value"); + client.send(msg, "DDDD"); + client.removeZeppelinConnection("DDDD"); + client.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java new file mode 100644 index 0000000..384cfe4 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClientTest.java @@ -0,0 +1,72 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket; + +import static org.junit.Assert.*; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient; +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock.MockEchoWebsocketServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeppelinhubClientTest { + private Logger LOG = LoggerFactory.getLogger(ZeppelinClientTest.class); + private final int zeppelinPort = 8090; + private final String validWebsocketUrl = "ws://localhost:" + zeppelinPort + "/ws"; + private ExecutorService executor; + private MockEchoWebsocketServer echoServer; + private final String runNotebookMsg = + "{\"op\":\"RUN_NOTEBOOK\"," + + "\"data\":[{\"id\":\"20150112-172845_1301897143\",\"title\":null,\"config\":{},\"params\":{},\"data\":null}," + + "{\"id\":\"20150112-172845_1301897143\",\"title\":null,\"config\":{},\"params\":{},\"data\":null}]," + + "\"meta\":{\"owner\":\"author\",\"instance\":\"my-zepp\",\"noteId\":\"2AB7SY361\"}}"; + private final String invalidRunNotebookMsg = "some random string"; + + @Before + public void setUp() throws Exception { + startWebsocketServer(); + } + + @After + public void tearDown() throws Exception { + //tear down routine + echoServer.stop(); + executor.shutdown(); + } + + private void startWebsocketServer() throws InterruptedException { + // mock zeppelin websocket server setup + executor = Executors.newFixedThreadPool(1); + echoServer = new MockEchoWebsocketServer(zeppelinPort); + executor.submit(echoServer); + } + + @Test + public void zeppelinhubClientSingletonTest() { + ZeppelinhubClient client1 = ZeppelinhubClient.getInstance(); + if (client1 == null) { + client1 = ZeppelinhubClient.initialize(validWebsocketUrl, "TOKEN"); + } + assertNotNull(client1); + ZeppelinhubClient client2 = ZeppelinhubClient.getInstance(); + assertNotNull(client2); + assertEquals(client1, client2); + } + + @Test + public void runAllParagraphTest() throws Exception { + Client.initialize(validWebsocketUrl, validWebsocketUrl, "TOKEN", null); + Client.getInstance().start(); + ZeppelinhubClient zeppelinhubClient = ZeppelinhubClient.getInstance(); + boolean runStatus = zeppelinhubClient.runAllParagraph("2AB7SY361", runNotebookMsg); + assertTrue(runStatus); + runStatus = zeppelinhubClient.runAllParagraph("2AB7SY361", invalidRunNotebookMsg); + assertFalse(runStatus); + Client.getInstance().stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java new file mode 100644 index 0000000..e9959e9 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEchoWebsocketServer.java @@ -0,0 +1,46 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.slf4j.LoggerFactory; + +public class MockEchoWebsocketServer implements Runnable { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockEchoWebsocketServer.class); + private Server server; + + public MockEchoWebsocketServer(int port) { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + connector.setPort(port); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + + //ServletHolder holderEvents = new ServletHolder("ws-events", MockEventServlet.class); + context.addServlet(MockEventServlet.class, "/ws/*"); + } + + public void start() throws Exception { + LOG.info("Starting mock echo websocket server"); + server.start(); + server.join(); + } + + public void stop() throws Exception { + LOG.info("Stopping mock echo websocket server"); + server.stop(); + } + + @Override + public void run() { + try { + this.start(); + } catch (Exception e) { + LOG.error("Couldn't start mock echo websocket server", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java new file mode 100644 index 0000000..c84f2c3 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventServlet.java @@ -0,0 +1,14 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock; + +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +@SuppressWarnings("serial") +public class MockEventServlet extends WebSocketServlet +{ + @Override + public void configure(WebSocketServletFactory factory) + { + factory.register(MockEventSocket.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java new file mode 100644 index 0000000..0f39b01 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/mock/MockEventSocket.java @@ -0,0 +1,38 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.mock; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockEventSocket extends WebSocketAdapter { + private static final Logger LOG = LoggerFactory.getLogger(MockEventServlet.class); + private Session session; + + @Override + public void onWebSocketConnect(Session session) { + super.onWebSocketConnect(session); + this.session = session; + LOG.info("Socket Connected: " + session); + } + + @Override + public void onWebSocketText(String message) { + super.onWebSocketText(message); + session.getRemote().sendStringByFuture(message); + LOG.info("Received TEXT message: {}", message); + + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + super.onWebSocketClose(statusCode, reason); + LOG.info("Socket Closed: [{}] {}", statusCode, reason); + } + + @Override + public void onWebSocketError(Throwable cause) { + super.onWebSocketError(cause); + LOG.error("Websocket error: {}", cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java new file mode 100644 index 0000000..4cce203 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessageTest.java @@ -0,0 +1,43 @@ +package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; + +import org.apache.zeppelin.notebook.socket.Message.OP; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class ZeppelinhubMessageTest { + + private String msg = "{\"op\":\"LIST_NOTES\",\"data\":\"my data\",\"meta\":{\"key1\":\"val1\"}}"; + + @Test + public void testThatCanSerializeZeppelinHubMessage() { + Map<String,String> meta = Maps.newHashMap(); + meta.put("key1", "val1"); + String zeppelinHubMsg = ZeppelinhubMessage.newMessage(OP.LIST_NOTES, "my data", meta).serialize(); + + assertEquals(msg, zeppelinHubMsg); + } + + @Test + public void testThastCanDeserialiseZeppelinhubMessage() { + Map<String,String> meta = Maps.newHashMap(); + meta.put("key1", "val1"); + ZeppelinhubMessage expected = ZeppelinhubMessage.newMessage(OP.LIST_NOTES.toString(), "my data", meta); + ZeppelinhubMessage zeppelinHubMsg = ZeppelinhubMessage.deserialize(msg); + + assertEquals(expected.op, zeppelinHubMsg.op); + assertEquals(expected.data, zeppelinHubMsg.data); + assertEquals(expected.meta, zeppelinHubMsg.meta); + } + + @Test + public void testThatInvalidStringReturnEmptyZeppelinhubMessage() { + assertEquals(ZeppelinhubMessage.EMPTY, ZeppelinhubMessage.deserialize("")); + assertEquals(ZeppelinhubMessage.EMPTY, ZeppelinhubMessage.deserialize("dwfewewrewr")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/resources/list_of_notes ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/list_of_notes b/zeppelin-zengine/src/test/resources/list_of_notes new file mode 100644 index 0000000..a33ee90 --- /dev/null +++ b/zeppelin-zengine/src/test/resources/list_of_notes @@ -0,0 +1,41 @@ +[ + { + "id": "2ABSFSR35", + "name": "ES RDD", + "config": { + "looknfeel": "default", + "codeHighlightStyle": "GitHub", + "codeHighlightStyleOrig": "GitHub" + }, + "info": {}, + "paragraphs": [], + "angularObjects": {}, + "lastUpdate": 1445663884000 + }, + { + "id": "2A94M5J1Z", + "name": "Zeppelin Tutorial", + "config": { + "looknfeel": "default"}, + "info": {}, + "paragraphs": [], + "angularObjects": { + "2AMFHG1GQ": [], + "2APT3NC5T": [], + "2ANPJTJRQ": [], + "2AMJZ9R4C": [], + "2ANJFBYJS": [], + "2APKU6T1J": []}, + "lastUpdate": 1446688883000 + }, + { + "id": "2AVSEYW1R", + "name": "NFLabs - Tracking beta", + "config": { + "looknfeel": "default"}, + "info": {}, + "paragraphs": [], + "angularObjects": {}, + "lastUpdate": 1445663888000 + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/test/resources/note ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/note b/zeppelin-zengine/src/test/resources/note new file mode 100644 index 0000000..256f6ef --- /dev/null +++ b/zeppelin-zengine/src/test/resources/note @@ -0,0 +1,16 @@ + { + "id": "2A94M5J1Z", + "name": "Zeppelin Tutorial", + "config": { + "looknfeel": "default"}, + "info": {}, + "paragraphs": [], + "angularObjects": { + "2AMFHG1GQ": [], + "2APT3NC5T": [], + "2ANPJTJRQ": [], + "2AMJZ9R4C": [], + "2ANJFBYJS": [], + "2APKU6T1J": []}, + "lastUpdate": 1446688883000 + }
