Repository: zeppelin
Updated Branches:
  refs/heads/master 65e1d3645 -> 42bcf4206


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
index 197a135..74cb25d 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2016-11-29")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2017-1-25")
 public class ZeppelinServerResourceParagraphRunner implements 
org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, 
ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, 
Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
index ef269e4..5a0193c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
@@ -72,7 +72,7 @@ public class LocalResourcePool implements ResourcePool {
   public void put(String name, Object object) {
     ResourceId resourceId = new ResourceId(resourcePoolId, name);
 
-    Resource resource = new Resource(resourceId, object);
+    Resource resource = new Resource(this, resourceId, object);
     resources.put(resourceId, resource);
   }
 
@@ -80,7 +80,7 @@ public class LocalResourcePool implements ResourcePool {
   public void put(String noteId, String paragraphId, String name, Object 
object) {
     ResourceId resourceId = new ResourceId(resourcePoolId, noteId, 
paragraphId, name);
 
-    Resource resource = new Resource(resourceId, object);
+    Resource resource = new Resource(this, resourceId, object);
     resources.put(resourceId, resource);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
index 5a8a9ea..6342231 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
@@ -23,11 +23,11 @@ public class RemoteResource extends Resource {
   ResourcePoolConnector resourcePoolConnector;
 
   RemoteResource(ResourceId resourceId, Object r) {
-    super(resourceId, r);
+    super(null, resourceId, r);
   }
 
   RemoteResource(ResourceId resourceId, boolean serializable, String 
className) {
-    super(resourceId, serializable, className);
+    super(null, resourceId, serializable, className);
   }
 
   @Override
@@ -52,4 +52,43 @@ public class RemoteResource extends Resource {
   public void setResourcePoolConnector(ResourcePoolConnector 
resourcePoolConnector) {
     this.resourcePoolConnector = resourcePoolConnector;
   }
+
+  /**
+   * Call a method of the object that this remote resource holds
+   * @param methodName name of method to call
+   * @param paramTypes method parameter types
+   * @param params method parameter values
+   * @return return value of the method. Null if return value is not 
serializable
+   */
+  @Override
+  public Object invokeMethod(
+      String methodName, Class [] paramTypes, Object [] params) {
+    ResourceId resourceId = getResourceId();
+    return resourcePoolConnector.invokeMethod(
+        resourceId,
+        methodName,
+        paramTypes,
+        params);
+  }
+
+  /**
+   * Call a method of the object that this remote resource holds and save 
return value as a resource
+   * @param methodName name of method to call
+   * @param paramTypes method parameter types
+   * @param params method parameter values
+   * @param returnResourceName name of resource that return value will be saved
+   * @return Resource that holds return value.
+   */
+  @Override
+  public Resource invokeMethod(
+      String methodName, Class [] paramTypes, Object [] params, String 
returnResourceName) {
+    ResourceId resourceId = getResourceId();
+    Resource resource = resourcePoolConnector.invokeMethod(
+        resourceId,
+        methodName,
+        paramTypes,
+        params,
+        returnResourceName);
+    return resource;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
index 6988b3e..a478c42 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
@@ -16,7 +16,11 @@
  */
 package org.apache.zeppelin.resource;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.*;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 
 /**
@@ -24,6 +28,7 @@ import java.nio.ByteBuffer;
  */
 public class Resource {
   private final transient Object r;
+  private final transient LocalResourcePool pool;
   private final boolean serializable;
   private final ResourceId resourceId;
   private final String className;
@@ -31,11 +36,13 @@ public class Resource {
 
   /**
    * Create local resource
+   *
    * @param resourceId
-   * @param r must not be null
+   * @param r          must not be null
    */
-  Resource(ResourceId resourceId, Object r) {
+  Resource(LocalResourcePool pool, ResourceId resourceId, Object r) {
     this.r = r;
+    this.pool = pool;
     this.resourceId = resourceId;
     this.serializable = r instanceof Serializable;
     this.className = r.getClass().getName();
@@ -43,10 +50,12 @@ public class Resource {
 
   /**
    * Create remote object
+   *
    * @param resourceId
    */
-  Resource(ResourceId resourceId, boolean serializable, String className) {
+  Resource(LocalResourcePool pool, ResourceId resourceId, boolean 
serializable, String className) {
     this.r = null;
+    this.pool = pool;
     this.resourceId = resourceId;
     this.serializable = serializable;
     this.className = className;
@@ -61,11 +70,10 @@ public class Resource {
   }
 
   /**
-   *
    * @return null when this is remote resource and not serializable.
    */
   public Object get() {
-    if (isLocal() || isSerializable()){
+    if (isLocal() || isSerializable()) {
       return r;
     } else {
       return null;
@@ -78,6 +86,7 @@ public class Resource {
 
   /**
    * if it is remote object
+   *
    * @return
    */
   public boolean isRemote() {
@@ -86,6 +95,7 @@ public class Resource {
 
   /**
    * Whether it is locally accessible or not
+   *
    * @return
    */
   public boolean isLocal() {
@@ -93,6 +103,65 @@ public class Resource {
   }
 
 
+  /**
+   * Call a method of the object that this resource holds
+   * @param methodName name of method to call
+   * @param paramTypes method parameter types
+   * @param params method parameter values
+   * @return return value of the method
+   */
+  public Object invokeMethod(
+      String methodName, Class [] paramTypes, Object [] params) {
+    if (r != null) {
+      try {
+        Method method = r.getClass().getMethod(
+            methodName,
+            paramTypes);
+        Object ret = method.invoke(r, params);
+        return ret;
+      }  catch (Exception e) {
+        logException(e);
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Call a method of the object that this resource holds and save return 
value as a resource
+   * @param methodName name of method to call
+   * @param paramTypes method parameter types
+   * @param params method parameter values
+   * @param returnResourceName name of resource that return value will be saved
+   * @return Resource that holds return value
+   */
+  public Resource invokeMethod(
+      String methodName, Class [] paramTypes, Object [] params, String 
returnResourceName) {
+    if (r != null) {
+      try {
+        Method method = r.getClass().getMethod(
+            methodName,
+            paramTypes);
+        Object ret = method.invoke(r, params);
+        pool.put(
+            resourceId.getNoteId(),
+            resourceId.getParagraphId(),
+            returnResourceName,
+            ret
+        );
+        return pool.get(
+            resourceId.getNoteId(),
+            resourceId.getParagraphId(),
+            returnResourceName);
+      } catch (Exception e) {
+        logException(e);
+        return null;
+      }
+    } else {
+      return null;
+    }
+  }
 
   public static ByteBuffer serializeObject(Object o) throws IOException {
     if (o == null || !(o instanceof Serializable)) {
@@ -129,4 +198,8 @@ public class Resource {
     return object;
   }
 
+  private void logException(Exception e) {
+    Logger logger = LoggerFactory.getLogger(Resource.class);
+    logger.error(e.getMessage(), e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
index af343db..f270d92 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
@@ -31,4 +31,24 @@ public interface ResourcePoolConnector {
    * @return
    */
   public Object readResource(ResourceId id);
+
+  /**
+   * Invoke method of Resource and get return
+   * @return
+   */
+  public Object invokeMethod(
+      ResourceId id,
+      String methodName,
+      Class[] paramTypes,
+      Object[] params);
+
+  /**
+   * Invoke method, put result into resource pool and return
+   */
+  public Resource invokeMethod(
+      ResourceId id,
+      String methodName,
+      Class[] paramTypes,
+      Object[] params,
+      String returnResourceName);
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift 
b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 50a5eb7..08a15ad 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -55,7 +55,8 @@ enum RemoteInterpreterEventType {
   ANGULAR_REGISTRY_PUSH = 11,
   APP_STATUS_UPDATE = 12,
   META_INFOS = 13,
-  REMOTE_ZEPPELIN_SERVER_RESOURCE = 14
+  REMOTE_ZEPPELIN_SERVER_RESOURCE = 14,
+  RESOURCE_INVOKE_METHOD = 15
 }
 
 
@@ -105,12 +106,16 @@ service RemoteInterpreterService {
   void resourcePoolResponseGetAll(1: list<string> resources);
   // as a response, ZeppelinServer send serialized value of resource
   void resourceResponseGet(1: string resourceId, 2: binary object);
+  // as a response, ZeppelinServer send return object
+  void resourceResponseInvokeMethod(1: string invokeMessage, 2: binary object);
   // get all resources in the interpreter process
   list<string> resourcePoolGetAll();
   // get value of resource
   binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string 
resourceName);
   // remove resource
   bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string 
resourceName);
+  // invoke method on resource
+  binary resourceInvokeMethod(1: string sessionKey, 2: string paragraphId, 
3:string resourceName, 4:string invokeMessage);
 
   void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string 
paragraphId, 4: string
   object);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
index e49306d..80ac555 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -68,7 +68,7 @@ public class MockInterpreterResourcePool extends Interpreter {
     String name = null;
     if (stmt.length >= 2) {
       String[] npn = stmt[1].split(":");
-      if (npn.length == 3) {
+      if (npn.length >= 3) {
         noteId = npn[0];
         paragraphId = npn[1];
         name = npn[2];
@@ -77,7 +77,7 @@ public class MockInterpreterResourcePool extends Interpreter {
       }
     }
     String value = null;
-    if (stmt.length == 3) {
+    if (stmt.length >= 3) {
       value = stmt[2];
     }
 
@@ -96,6 +96,14 @@ public class MockInterpreterResourcePool extends Interpreter 
{
       ret = resourcePool.remove(noteId, paragraphId, name);
     } else if (cmd.equals("getAll")) {
       ret = resourcePool.getAll();
+    } else if (cmd.equals("invoke")) {
+      Resource resource = resourcePool.get(noteId, paragraphId, name);
+      if (stmt.length >=4) {
+        Resource res = resource.invokeMethod(value, null, null, stmt[3]);
+        ret = res.get();
+      } else {
+        ret = resource.invokeMethod(value, null, null);
+      }
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
index 8095043..363ccf6 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -190,6 +190,17 @@ public class DistributedResourcePoolTest {
         }
         return null;
       }
+
+      @Override
+      public Object invokeMethod(ResourceId id, String methodName, Class[] 
paramTypes, Object[] params) {
+        return null;
+      }
+
+      @Override
+      public Resource invokeMethod(ResourceId id, String methodName, Class[] 
paramTypes, Object[]
+          params, String returnResourceName) {
+        return null;
+      }
     });
 
     assertEquals(0, pool1.getAll().size());
@@ -249,4 +260,44 @@ public class DistributedResourcePoolTest {
         String.class));
 
   }
+
+  @Test
+  public void testResourceInvokeMethod() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+    intp1.interpret("put key1 hey", context);
+    intp2.interpret("put key2 world", context);
+
+    // invoke method in local resource pool
+    ret = intp1.interpret("invoke key1 length", context);
+    assertEquals("3", ret.message().get(0).getData());
+
+    // invoke method in remote resource pool
+    ret = intp1.interpret("invoke key2 length", context);
+    assertEquals("5", ret.message().get(0).getData());
+
+    // make sure no resources are automatically created
+    ret = intp1.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
+
+    // invoke method in local resource pool and save result
+    ret = intp1.interpret("invoke key1 length ret1", context);
+    assertEquals("3", ret.message().get(0).getData());
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(3, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
+
+    ret = intp1.interpret("get ret1", context);
+    assertEquals("3", gson.fromJson(ret.message().get(0).getData(), 
String.class));
+
+    // invoke method in remote resource pool and save result
+    ret = intp1.interpret("invoke key2 length ret2", context);
+    assertEquals("5", ret.message().get(0).getData());
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(4, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
+
+    ret = intp1.interpret("get ret2", context);
+    assertEquals("5", gson.fromJson(ret.message().get(0).getData(), 
String.class));
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42bcf420/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
index ca64525..cc1cad1 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
@@ -29,8 +29,8 @@ public class ResourceSetTest {
   public void testFilterByName() {
     ResourceSet set = new ResourceSet();
 
-    set.add(new Resource(new ResourceId("poo1", "resource1"), "value1"));
-    set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2)));
+    set.add(new Resource(null, new ResourceId("poo1", "resource1"), "value1"));
+    set.add(new Resource(null, new ResourceId("poo1", "resource2"), new 
Integer(2)));
     assertEquals(2, set.filterByNameRegex(".*").size());
     assertEquals(1, set.filterByNameRegex("resource1").size());
     assertEquals(1, set.filterByNameRegex("resource2").size());
@@ -42,8 +42,8 @@ public class ResourceSetTest {
   public void testFilterByClassName() {
     ResourceSet set = new ResourceSet();
 
-    set.add(new Resource(new ResourceId("poo1", "resource1"), "value1"));
-    set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2)));
+    set.add(new Resource(null, new ResourceId("poo1", "resource1"), "value1"));
+    set.add(new Resource(null, new ResourceId("poo1", "resource2"), new 
Integer(2)));
 
     assertEquals(1, set.filterByClassnameRegex(".*String").size());
     assertEquals(1, set.filterByClassnameRegex(String.class.getName()).size());

Reply via email to