Repository: zeppelin Updated Branches: refs/heads/master 65e1d3645 -> 42bcf4206
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java index 197a135..74cb25d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java @@ -51,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-29") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-1-25") public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java index ef269e4..5a0193c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java @@ -72,7 +72,7 @@ public class LocalResourcePool implements ResourcePool { public void put(String name, Object object) { ResourceId resourceId = new ResourceId(resourcePoolId, name); - Resource resource = new Resource(resourceId, object); + Resource resource = new Resource(this, resourceId, object); resources.put(resourceId, resource); } @@ -80,7 +80,7 @@ public class LocalResourcePool implements ResourcePool { public void put(String noteId, String paragraphId, String name, Object object) { ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name); - Resource resource = new Resource(resourceId, object); + Resource resource = new Resource(this, resourceId, object); resources.put(resourceId, resource); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java index 5a8a9ea..6342231 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java @@ -23,11 +23,11 @@ public class RemoteResource extends Resource { ResourcePoolConnector resourcePoolConnector; RemoteResource(ResourceId resourceId, Object r) { - super(resourceId, r); + super(null, resourceId, r); } RemoteResource(ResourceId resourceId, boolean serializable, String className) { - super(resourceId, serializable, className); + super(null, resourceId, serializable, className); } @Override @@ -52,4 +52,43 @@ public class RemoteResource extends Resource { public void setResourcePoolConnector(ResourcePoolConnector resourcePoolConnector) { this.resourcePoolConnector = resourcePoolConnector; } + + /** + * Call a method of the object that this remote resource holds + * @param methodName name of method to call + * @param paramTypes method parameter types + * @param params method parameter values + * @return return value of the method. Null if return value is not serializable + */ + @Override + public Object invokeMethod( + String methodName, Class [] paramTypes, Object [] params) { + ResourceId resourceId = getResourceId(); + return resourcePoolConnector.invokeMethod( + resourceId, + methodName, + paramTypes, + params); + } + + /** + * Call a method of the object that this remote resource holds and save return value as a resource + * @param methodName name of method to call + * @param paramTypes method parameter types + * @param params method parameter values + * @param returnResourceName name of resource that return value will be saved + * @return Resource that holds return value. + */ + @Override + public Resource invokeMethod( + String methodName, Class [] paramTypes, Object [] params, String returnResourceName) { + ResourceId resourceId = getResourceId(); + Resource resource = resourcePoolConnector.invokeMethod( + resourceId, + methodName, + paramTypes, + params, + returnResourceName); + return resource; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java index 6988b3e..a478c42 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java @@ -16,7 +16,11 @@ */ package org.apache.zeppelin.resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.*; +import java.lang.reflect.Method; import java.nio.ByteBuffer; /** @@ -24,6 +28,7 @@ import java.nio.ByteBuffer; */ public class Resource { private final transient Object r; + private final transient LocalResourcePool pool; private final boolean serializable; private final ResourceId resourceId; private final String className; @@ -31,11 +36,13 @@ public class Resource { /** * Create local resource + * * @param resourceId - * @param r must not be null + * @param r must not be null */ - Resource(ResourceId resourceId, Object r) { + Resource(LocalResourcePool pool, ResourceId resourceId, Object r) { this.r = r; + this.pool = pool; this.resourceId = resourceId; this.serializable = r instanceof Serializable; this.className = r.getClass().getName(); @@ -43,10 +50,12 @@ public class Resource { /** * Create remote object + * * @param resourceId */ - Resource(ResourceId resourceId, boolean serializable, String className) { + Resource(LocalResourcePool pool, ResourceId resourceId, boolean serializable, String className) { this.r = null; + this.pool = pool; this.resourceId = resourceId; this.serializable = serializable; this.className = className; @@ -61,11 +70,10 @@ public class Resource { } /** - * * @return null when this is remote resource and not serializable. */ public Object get() { - if (isLocal() || isSerializable()){ + if (isLocal() || isSerializable()) { return r; } else { return null; @@ -78,6 +86,7 @@ public class Resource { /** * if it is remote object + * * @return */ public boolean isRemote() { @@ -86,6 +95,7 @@ public class Resource { /** * Whether it is locally accessible or not + * * @return */ public boolean isLocal() { @@ -93,6 +103,65 @@ public class Resource { } + /** + * Call a method of the object that this resource holds + * @param methodName name of method to call + * @param paramTypes method parameter types + * @param params method parameter values + * @return return value of the method + */ + public Object invokeMethod( + String methodName, Class [] paramTypes, Object [] params) { + if (r != null) { + try { + Method method = r.getClass().getMethod( + methodName, + paramTypes); + Object ret = method.invoke(r, params); + return ret; + } catch (Exception e) { + logException(e); + return null; + } + } else { + return null; + } + } + + /** + * Call a method of the object that this resource holds and save return value as a resource + * @param methodName name of method to call + * @param paramTypes method parameter types + * @param params method parameter values + * @param returnResourceName name of resource that return value will be saved + * @return Resource that holds return value + */ + public Resource invokeMethod( + String methodName, Class [] paramTypes, Object [] params, String returnResourceName) { + if (r != null) { + try { + Method method = r.getClass().getMethod( + methodName, + paramTypes); + Object ret = method.invoke(r, params); + pool.put( + resourceId.getNoteId(), + resourceId.getParagraphId(), + returnResourceName, + ret + ); + return pool.get( + resourceId.getNoteId(), + resourceId.getParagraphId(), + returnResourceName); + } catch (Exception e) { + logException(e); + return null; + } + } else { + return null; + } + } public static ByteBuffer serializeObject(Object o) throws IOException { if (o == null || !(o instanceof Serializable)) { @@ -129,4 +198,8 @@ public class Resource { return object; } + private void logException(Exception e) { + Logger logger = LoggerFactory.getLogger(Resource.class); + logger.error(e.getMessage(), e); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java index af343db..f270d92 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java @@ -31,4 +31,24 @@ public interface ResourcePoolConnector { * @return */ public Object readResource(ResourceId id); + + /** + * Invoke method of Resource and get return + * @return + */ + public Object invokeMethod( + ResourceId id, + String methodName, + Class[] paramTypes, + Object[] params); + + /** + * Invoke method, put result into resource pool and return + */ + public Resource invokeMethod( + ResourceId id, + String methodName, + Class[] paramTypes, + Object[] params, + String returnResourceName); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 50a5eb7..08a15ad 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -55,7 +55,8 @@ enum RemoteInterpreterEventType { ANGULAR_REGISTRY_PUSH = 11, APP_STATUS_UPDATE = 12, META_INFOS = 13, - REMOTE_ZEPPELIN_SERVER_RESOURCE = 14 + REMOTE_ZEPPELIN_SERVER_RESOURCE = 14, + RESOURCE_INVOKE_METHOD = 15 } @@ -105,12 +106,16 @@ service RemoteInterpreterService { void resourcePoolResponseGetAll(1: list<string> resources); // as a response, ZeppelinServer send serialized value of resource void resourceResponseGet(1: string resourceId, 2: binary object); + // as a response, ZeppelinServer send return object + void resourceResponseInvokeMethod(1: string invokeMessage, 2: binary object); // get all resources in the interpreter process list<string> resourcePoolGetAll(); // get value of resource binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName); // remove resource bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName); + // invoke method on resource + binary resourceInvokeMethod(1: string sessionKey, 2: string paragraphId, 3:string resourceName, 4:string invokeMessage); void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index e49306d..80ac555 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -68,7 +68,7 @@ public class MockInterpreterResourcePool extends Interpreter { String name = null; if (stmt.length >= 2) { String[] npn = stmt[1].split(":"); - if (npn.length == 3) { + if (npn.length >= 3) { noteId = npn[0]; paragraphId = npn[1]; name = npn[2]; @@ -77,7 +77,7 @@ public class MockInterpreterResourcePool extends Interpreter { } } String value = null; - if (stmt.length == 3) { + if (stmt.length >= 3) { value = stmt[2]; } @@ -96,6 +96,14 @@ public class MockInterpreterResourcePool extends Interpreter { ret = resourcePool.remove(noteId, paragraphId, name); } else if (cmd.equals("getAll")) { ret = resourcePool.getAll(); + } else if (cmd.equals("invoke")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (stmt.length >=4) { + Resource res = resource.invokeMethod(value, null, null, stmt[3]); + ret = res.get(); + } else { + ret = resource.invokeMethod(value, null, null); + } } try { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index 8095043..363ccf6 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -190,6 +190,17 @@ public class DistributedResourcePoolTest { } return null; } + + @Override + public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) { + return null; + } + + @Override + public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] + params, String returnResourceName) { + return null; + } }); assertEquals(0, pool1.getAll().size()); @@ -249,4 +260,44 @@ public class DistributedResourcePoolTest { String.class)); } + + @Test + public void testResourceInvokeMethod() { + Gson gson = new Gson(); + InterpreterResult ret; + intp1.interpret("put key1 hey", context); + intp2.interpret("put key2 world", context); + + // invoke method in local resource pool + ret = intp1.interpret("invoke key1 length", context); + assertEquals("3", ret.message().get(0).getData()); + + // invoke method in remote resource pool + ret = intp1.interpret("invoke key2 length", context); + assertEquals("5", ret.message().get(0).getData()); + + // make sure no resources are automatically created + ret = intp1.interpret("getAll", context); + assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + // invoke method in local resource pool and save result + ret = intp1.interpret("invoke key1 length ret1", context); + assertEquals("3", ret.message().get(0).getData()); + + ret = intp1.interpret("getAll", context); + assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + ret = intp1.interpret("get ret1", context); + assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class)); + + // invoke method in remote resource pool and save result + ret = intp1.interpret("invoke key2 length ret2", context); + assertEquals("5", ret.message().get(0).getData()); + + ret = intp1.interpret("getAll", context); + assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + ret = intp1.interpret("get ret2", context); + assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class)); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java index ca64525..cc1cad1 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java @@ -29,8 +29,8 @@ public class ResourceSetTest { public void testFilterByName() { ResourceSet set = new ResourceSet(); - set.add(new Resource(new ResourceId("poo1", "resource1"), "value1")); - set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2))); + set.add(new Resource(null, new ResourceId("poo1", "resource1"), "value1")); + set.add(new Resource(null, new ResourceId("poo1", "resource2"), new Integer(2))); assertEquals(2, set.filterByNameRegex(".*").size()); assertEquals(1, set.filterByNameRegex("resource1").size()); assertEquals(1, set.filterByNameRegex("resource2").size()); @@ -42,8 +42,8 @@ public class ResourceSetTest { public void testFilterByClassName() { ResourceSet set = new ResourceSet(); - set.add(new Resource(new ResourceId("poo1", "resource1"), "value1")); - set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2))); + set.add(new Resource(null, new ResourceId("poo1", "resource1"), "value1")); + set.add(new Resource(null, new ResourceId("poo1", "resource2"), new Integer(2))); assertEquals(1, set.filterByClassnameRegex(".*String").size()); assertEquals(1, set.filterByClassnameRegex(String.class.getName()).size());
