APEXCORE-144, APEXCORE-145 Rest api changes to view module information

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/52187aa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/52187aa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/52187aa2

Branch: refs/heads/feature-module
Commit: 52187aa2d3ba44b6102860c155783b51a0805e14
Parents: eda1b60
Author: shubham <[email protected]>
Authored: Tue Nov 17 12:10:08 2015 +0530
Committer: Tushar R. Gosavi <[email protected]>
Committed: Fri Dec 18 11:42:12 2015 +0530

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 125 ++++++++++++---
 .../java/com/datatorrent/stram/cli/DTCli.java   |   8 +-
 .../stram/codec/LogicalPlanSerializer.java      |  98 +++++++++---
 .../stram/plan/logical/LogicalPlan.java         |   1 +
 .../stram/webapp/StramWebServices.java          | 156 +++++++++++++------
 5 files changed, 294 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ca724db..c61a9fc 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -18,33 +18,47 @@
  */
 package com.datatorrent.stram;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
-import com.datatorrent.netlet.util.DTThrowable;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import net.engio.mbassy.bus.MBassador;
-import net.engio.mbassy.bus.config.BusConfiguration;
-
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -71,13 +85,30 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 
-import com.datatorrent.api.*;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.StringCodec;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.bufferserver.auth.AuthManager;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.experimental.AppData;
@@ -85,34 +116,62 @@ import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.common.util.NumberAggregate;
 import com.datatorrent.common.util.Pair;
+import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
-import com.datatorrent.stram.api.*;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*;
+import com.datatorrent.stram.api.AppDataSource;
+import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.api.ContainerContext;
+import com.datatorrent.stram.api.OperatorDeployInfo;
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
+import com.datatorrent.stram.api.StramToNodeGetPropertyRequest;
+import com.datatorrent.stram.api.StramToNodeSetPropertyRequest;
+import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
+import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat;
+import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
+import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
+import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
+import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
+import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
 import com.datatorrent.stram.engine.OperatorResponse;
 import com.datatorrent.stram.engine.StreamingContainer;
 import com.datatorrent.stram.engine.WindowGenerator;
 import com.datatorrent.stram.plan.logical.LogicalOperatorStatus;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 import com.datatorrent.stram.plan.logical.Operators;
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
-import com.datatorrent.stram.plan.physical.*;
+import com.datatorrent.stram.plan.physical.OperatorStatus;
 import com.datatorrent.stram.plan.physical.OperatorStatus.PortStatus;
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PTOperator.PTInput;
 import com.datatorrent.stram.plan.physical.PTOperator.PTOutput;
 import com.datatorrent.stram.plan.physical.PTOperator.State;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
 import com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext;
+import com.datatorrent.stram.plan.physical.PlanModifier;
 import com.datatorrent.stram.util.ConfigUtils;
 import com.datatorrent.stram.util.FSJsonLineFile;
 import com.datatorrent.stram.util.MovingAverage.MovingAverageLong;
 import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
 import com.datatorrent.stram.util.WebServicesClient;
-import com.datatorrent.stram.webapp.*;
+import com.datatorrent.stram.webapp.ContainerInfo;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
+import com.datatorrent.stram.webapp.OperatorAggregationInfo;
+import com.datatorrent.stram.webapp.OperatorInfo;
+import com.datatorrent.stram.webapp.PortInfo;
+import com.datatorrent.stram.webapp.StreamInfo;
+
+import net.engio.mbassy.bus.MBassador;
+import net.engio.mbassy.bus.config.BusConfiguration;
 
 /**
  * Tracks topology provisioning/allocation to containers<p>
@@ -2246,6 +2305,24 @@ public class StreamingContainerManager implements 
PlanContext
     return fillLogicalOperatorInfo(operatorMeta);
   }
 
+  public ModuleMeta getModuleMeta(String moduleName)
+  {
+    return getModuleMeta(moduleName, getLogicalPlan());
+  }
+
+  private ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag)
+  {
+    for (ModuleMeta m : dag.getAllModules()) {
+      if (m.getFullName().equals(moduleName)) {
+        return m;
+      }
+      ModuleMeta res = getModuleMeta(moduleName, m.getDag());
+      if (res != null)
+        return res;
+    }
+    return null;
+  }
+
   public List<LogicalOperatorInfo> getLogicalOperatorInfoList()
   {
     List<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java 
b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
index deb0967..696f497 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java
@@ -2824,7 +2824,7 @@ public class DTCli
               }
               LogicalPlan logicalPlan = 
appFactory.createApp(submitApp.getLogicalPlanConfiguration());
               map.put("applicationName", appFactory.getName());
-              map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(logicalPlan));
+              map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(logicalPlan, false));
             } finally {
               if (raw) {
                 System.setOut(originalStream);
@@ -2840,7 +2840,7 @@ public class DTCli
             LogicalPlan logicalPlan = 
appFactory.createApp(submitApp.getLogicalPlanConfiguration());
             Map<String, Object> map = new HashMap<String, Object>();
             map.put("applicationName", appFactory.getName());
-            map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(logicalPlan));
+            map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(logicalPlan, false));
             printJson(map);
           } else if (filename.endsWith(".properties")) {
             File file = new File(filename);
@@ -2849,7 +2849,7 @@ public class DTCli
             LogicalPlan logicalPlan = 
appFactory.createApp(submitApp.getLogicalPlanConfiguration());
             Map<String, Object> map = new HashMap<String, Object>();
             map.put("applicationName", appFactory.getName());
-            map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(logicalPlan));
+            map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(logicalPlan, false));
             printJson(map);
           } else {
             StramAppLauncher submitApp = getStramAppLauncher(filename, config, 
commandLineInfo.ignorePom);
@@ -2893,7 +2893,7 @@ public class DTCli
               Map<String, Object> map = new HashMap<String, Object>();
               map.put("applicationName", appInfo.name);
               if (appInfo.dag != null) {
-                map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(appInfo.dag));
+                map.put("logicalPlan", 
LogicalPlanSerializer.convertToMap(appInfo.dag, false));
               }
               if (appInfo.error != null) {
                 map.put("error", appInfo.error);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java 
b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 90dd2b5..49295d8 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -18,36 +18,53 @@
  */
 package com.datatorrent.stram.codec;
 
-import com.datatorrent.api.*;
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.Operator.OutputPort;
-import com.datatorrent.common.util.ObjectMapperString;
-import com.datatorrent.stram.plan.logical.*;
-import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
-import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import javax.ws.rs.Produces;
 import javax.ws.rs.ext.Provider;
-import org.apache.commons.beanutils.BeanMap;
-import org.apache.commons.configuration.PropertiesConfiguration;
+
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonProcessingException;
 import org.codehaus.jackson.annotate.JsonTypeInfo;
-import org.codehaus.jackson.map.*;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectMapper.DefaultTypeResolverBuilder;
 import org.codehaus.jackson.map.ObjectMapper.DefaultTyping;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.SerializerProvider;
 import org.codehaus.jackson.map.jsontype.impl.StdTypeResolverBuilder;
 import org.codehaus.jackson.type.JavaType;
-import org.codehaus.jettison.json.*;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.beanutils.BeanMap;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.common.util.ObjectMapperString;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+import com.datatorrent.stram.plan.logical.Operators;
+import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
+
 /**
  * <p>LogicalPlanSerializer class.</p>
  *
@@ -88,7 +105,7 @@ public class LogicalPlanSerializer extends 
JsonSerializer<LogicalPlan>
    * @param dag
    * @return
    */
-  public static Map<String, Object> convertToMap(LogicalPlan dag)
+  public static Map<String, Object> convertToMap(LogicalPlan dag, boolean 
includeModules)
   {
     HashMap<String, Object> result = new HashMap<String, Object>();
     ArrayList<Object> operatorArray = new ArrayList< Object>();
@@ -200,6 +217,15 @@ public class LogicalPlanSerializer extends 
JsonSerializer<LogicalPlan>
         streamDetailMap.put("locality", streamMeta.getLocality().name());
       }
     }
+
+    if (includeModules) {
+      ArrayList<Map<String, Object>> modulesArray = new ArrayList<>();
+      result.put("modules", modulesArray);
+      for(LogicalPlan.ModuleMeta meta : dag.getAllModules()) {
+        modulesArray.add(getLogicalModuleDetails(dag, meta));
+      }
+    }
+
     return result;
   }
 
@@ -323,13 +349,43 @@ public class LogicalPlanSerializer extends 
JsonSerializer<LogicalPlan>
 
   public static JSONObject convertToJsonObject(LogicalPlan dag)
   {
-    return new JSONObject(convertToMap(dag));
+    return new JSONObject(convertToMap(dag, false));
   }
 
   @Override
-  public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider 
sp) throws IOException, JsonProcessingException
+  public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider 
sp) throws IOException,
+      JsonProcessingException
   {
-    jg.writeObject(convertToMap(dag));
+    jg.writeObject(convertToMap(dag, false));
+  }
+
+  /**
+   * Return information about operators and inner modules of a module.
+   *
+   * @param dag        top level DAG
+   * @param moduleMeta module information. DAG within module is used for 
constructing response.
+   * @return
+   */
+  private static Map<String, Object> getLogicalModuleDetails(LogicalPlan dag, 
LogicalPlan.ModuleMeta moduleMeta)
+  {
+    Map<String, Object> moduleDetailMap = new HashMap<String, Object>();
+    ArrayList<String> operatorArray = new ArrayList<>();
+    moduleDetailMap.put("name", moduleMeta.getName());
+    moduleDetailMap.put("className", 
moduleMeta.getModule().getClass().getName());
+
+    moduleDetailMap.put("operators", operatorArray);
+    for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) {
+      if (operatorMeta.getModuleName() == null) {
+        String fullName = moduleMeta.getFullName() + 
LogicalPlan.MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
+        operatorArray.add(fullName);
+      }
+    }
+    ArrayList<Map<String, Object>> modulesArray = new ArrayList<>();
+    moduleDetailMap.put("modules", modulesArray);
+    for (LogicalPlan.ModuleMeta meta : moduleMeta.getDag().getAllModules()) {
+      modulesArray.add(getLogicalModuleDetails(dag, meta));
+    }
+    return moduleDetailMap;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index a5b591c..47e077f 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -156,6 +156,7 @@ public class LogicalPlan implements Serializable, DAG
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
   private transient int nodeIndex = 0; // used for cycle validation
   private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // 
used for cycle validation
+  // streamLinks is used for connecting proxy ports to actual ports. This is 
not needed after dag is fully expanded.
   private transient Map<String, ArrayListMultimap<OutputPort<?>, 
InputPort<?>>> streamLinks = new HashMap<String, 
ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>>>();
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java 
b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 6fdba00..a9c2068 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -20,8 +20,19 @@ package com.datatorrent.stram.webapp;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -29,18 +40,16 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 
-import org.apache.commons.beanutils.BeanMap;
-import org.apache.commons.beanutils.BeanUtils;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-import org.apache.log4j.DTLoggerFactory;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonProcessingException;
 import org.codehaus.jackson.Version;
@@ -54,20 +63,30 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.beanutils.BeanMap;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.apache.log4j.DTLoggerFactory;
+
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StringCodec;
-
 import com.datatorrent.stram.StramAppContext;
 import com.datatorrent.stram.StreamingContainerAgent;
 import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.StringCodecs;
 import com.datatorrent.stram.codec.LogicalPlanSerializer;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
@@ -511,7 +530,7 @@ public class StramWebServices
     nodeList.operators = dagManager.getLogicalOperatorInfoList();
     return new JSONObject(objectMapper.writeValueAsString(nodeList));
   }
-
+  
   @GET
   @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}")
   @Produces(MediaType.APPLICATION_JSON)
@@ -571,7 +590,6 @@ public class StramWebServices
     }
     return response;
   }
-
   @POST // not supported by WebAppProxyServlet, can only be called directly
   @Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties")
   @Consumes(MediaType.APPLICATION_JSON)
@@ -633,60 +651,97 @@ public class StramWebServices
   public JSONObject getPorts(@PathParam("operatorName") String operatorName)
   {
     OperatorMeta logicalOperator = 
dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    Set<LogicalPlan.InputPortMeta> inputPorts;
+    Set<LogicalPlan.OutputPortMeta> outputPorts;
     if (logicalOperator == null) {
-      throw new NotFoundException();
+      ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+      if (logicalModule == null) {
+        throw new NotFoundException();
+      }
+      inputPorts = logicalModule.getInputStreams().keySet();
+      outputPorts = logicalModule.getOutputStreams().keySet();
+    } else {
+      inputPorts = logicalOperator.getInputStreams().keySet();
+      outputPorts = logicalOperator.getOutputStreams().keySet();
     }
+
+    JSONObject result = getPortsObjects(inputPorts, outputPorts);
+    return result;
+  }
+
+  private JSONObject getPortsObjects(Collection<LogicalPlan.InputPortMeta> 
inputs, Collection<LogicalPlan.OutputPortMeta> outputs)
+  {
     JSONObject result = new JSONObject();
     JSONArray ports = new JSONArray();
     try {
-      for (LogicalPlan.InputPortMeta inputPort : 
logicalOperator.getInputStreams().keySet()) {
+      for (LogicalPlan.InputPortMeta inputPort : inputs) {
         JSONObject port = new JSONObject();
         port.put("name", inputPort.getPortName());
         port.put("type", "input");
         ports.put(port);
       }
-      for (LogicalPlan.OutputPortMeta outputPort : 
logicalOperator.getOutputStreams().keySet()) {
+      for (LogicalPlan.OutputPortMeta outputPort : outputs) {
         JSONObject port = new JSONObject();
         port.put("name", outputPort.getPortName());
         port.put("type", "output");
         ports.put(port);
       }
       result.put("ports", ports);
-    }
-    catch (JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
     return result;
   }
 
+  private JSONObject getPortObject(Collection<LogicalPlan.InputPortMeta> 
inputs, Collection<LogicalPlan.OutputPortMeta> outputs,
+                                   String portName) throws JSONException
+  {
+    for (LogicalPlan.InputPortMeta inputPort : inputs) {
+      if (inputPort.getPortName().equals(portName)) {
+        JSONObject port = new JSONObject();
+        port.put("name", inputPort.getPortName());
+        port.put("type", "input");
+        return port;
+      }
+    }
+    for (LogicalPlan.OutputPortMeta outputPort : outputs) {
+      if (outputPort.getPortName().equals(portName)) {
+        JSONObject port = new JSONObject();
+        port.put("name", outputPort.getPortName());
+        port.put("type", "output");
+        return port;
+      }
+    }
+    return null;
+  }
+
+
   @GET
   @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}")
   @Produces(MediaType.APPLICATION_JSON)
   public JSONObject getPort(@PathParam("operatorName") String operatorName, 
@PathParam("portName") String portName)
   {
     OperatorMeta logicalOperator = 
dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    Set<LogicalPlan.InputPortMeta> inputPorts;
+    Set<LogicalPlan.OutputPortMeta> outputPorts;
     if (logicalOperator == null) {
-      throw new NotFoundException();
+      ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+      if (logicalModule == null) {
+        throw new NotFoundException();
+      }
+      inputPorts = logicalModule.getInputStreams().keySet();
+      outputPorts = logicalModule.getOutputStreams().keySet();
+    } else {
+      inputPorts = logicalOperator.getInputStreams().keySet();
+      outputPorts = logicalOperator.getOutputStreams().keySet();
     }
+
     try {
-      for (LogicalPlan.InputPortMeta inputPort : 
logicalOperator.getInputStreams().keySet()) {
-        if (portName.equals(portName)) {
-          JSONObject port = new JSONObject();
-          port.put("name", inputPort.getPortName());
-          port.put("type", "input");
-          return port;
-        }
-      }
-      for (LogicalPlan.OutputPortMeta outputPort : 
logicalOperator.getOutputStreams().keySet()) {
-        if (portName.equals(portName)) {
-          JSONObject port = new JSONObject();
-          port.put("name", outputPort.getPortName());
-          port.put("type", "output");
-          return port;
-        }
+      JSONObject resp = getPortObject(inputPorts, outputPorts, portName);
+      if (resp != null) {
+        return resp;
       }
-    }
-    catch (JSONException ex) {
+    } catch (JSONException ex) {
       throw new RuntimeException(ex);
     }
     throw new NotFoundException();
@@ -711,18 +766,30 @@ public class StramWebServices
   {
     init();
     OperatorMeta logicalOperator = 
dagManager.getLogicalPlan().getOperatorMeta(operatorName);
+    BeanMap operatorProperties = null;
     if (logicalOperator == null) {
-      throw new NotFoundException();
+      ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
+      if (logicalModule == null) {
+        throw new NotFoundException();
+      }
+      operatorProperties = 
LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule());
+    } else {
+      operatorProperties = 
LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
     }
 
-    BeanMap operatorProperties = 
LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
+    Map<String, Object> m = getPropertiesAsMap(propertyName, 
operatorProperties);
+    return new JSONObject(objectMapper.writeValueAsString(m));
+  }
+
+  private Map<String, Object> getPropertiesAsMap(@QueryParam("propertyName") 
String propertyName, BeanMap operatorProperties)
+  {
     Map<String, Object> m = new HashMap<String, Object>();
     @SuppressWarnings("rawtypes")
     Iterator entryIterator = operatorProperties.entryIterator();
     while (entryIterator.hasNext()) {
       try {
         @SuppressWarnings("unchecked")
-        Map.Entry<String, Object> entry = (Map.Entry<String, 
Object>)entryIterator.next();
+        Entry<String, Object> entry = (Entry<String, 
Object>)entryIterator.next();
         if (propertyName == null) {
           m.put(entry.getKey(), entry.getValue());
         }
@@ -730,12 +797,11 @@ public class StramWebServices
           m.put(entry.getKey(), entry.getValue());
           break;
         }
-      }
-      catch (Exception ex) {
+      } catch (Exception ex) {
         LOG.warn("Caught exception", ex);
       }
     }
-    return new JSONObject(objectMapper.writeValueAsString(m));
+    return m;
   }
 
   @GET
@@ -765,10 +831,10 @@ public class StramWebServices
   @GET
   @Path(PATH_LOGICAL_PLAN)
   @Produces(MediaType.APPLICATION_JSON)
-  public JSONObject getLogicalPlan() throws JSONException, IOException
+  public JSONObject getLogicalPlan(@QueryParam("includeModules") String 
includeModules) throws JSONException, IOException
   {
-    LogicalPlan lp = dagManager.getLogicalPlan();
-    return new 
JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(lp)));
+    return new 
JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(
+        dagManager.getLogicalPlan(), includeModules != null)));
   }
 
   @POST // not supported by WebAppProxyServlet, can only be called directly

Reply via email to