APEXCORE-144, APEXCORE-145 Rest api changes to view module information
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/52187aa2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/52187aa2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/52187aa2 Branch: refs/heads/feature-module Commit: 52187aa2d3ba44b6102860c155783b51a0805e14 Parents: eda1b60 Author: shubham <[email protected]> Authored: Tue Nov 17 12:10:08 2015 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Fri Dec 18 11:42:12 2015 +0530 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 125 ++++++++++++--- .../java/com/datatorrent/stram/cli/DTCli.java | 8 +- .../stram/codec/LogicalPlanSerializer.java | 98 +++++++++--- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/webapp/StramWebServices.java | 156 +++++++++++++------ 5 files changed, 294 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index ca724db..c61a9fc 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -18,33 +18,47 @@ */ package com.datatorrent.stram; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; import java.lang.management.ManagementFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.URI; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import com.datatorrent.netlet.util.DTThrowable; -import com.esotericsoftware.kryo.KryoException; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import net.engio.mbassy.bus.MBassador; -import net.engio.mbassy.bus.config.BusConfiguration; - import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -71,13 +85,30 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.NotFoundException; -import com.datatorrent.api.*; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StorageAgent; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.StringCodec; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.bufferserver.auth.AuthManager; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.experimental.AppData; @@ -85,34 +116,62 @@ import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.common.util.NumberAggregate; import com.datatorrent.common.util.Pair; +import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.stram.Journal.Recoverable; import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest; -import com.datatorrent.stram.api.*; -import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*; +import com.datatorrent.stram.api.AppDataSource; +import com.datatorrent.stram.api.Checkpoint; +import com.datatorrent.stram.api.ContainerContext; +import com.datatorrent.stram.api.OperatorDeployInfo; +import com.datatorrent.stram.api.StramEvent; +import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest; +import com.datatorrent.stram.api.StramToNodeGetPropertyRequest; +import com.datatorrent.stram.api.StramToNodeSetPropertyRequest; +import com.datatorrent.stram.api.StramToNodeStartRecordingRequest; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest; +import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext; import com.datatorrent.stram.engine.OperatorResponse; import com.datatorrent.stram.engine.StreamingContainer; import com.datatorrent.stram.engine.WindowGenerator; import com.datatorrent.stram.plan.logical.LogicalOperatorStatus; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; import com.datatorrent.stram.plan.logical.Operators; import com.datatorrent.stram.plan.logical.Operators.PortContextPair; import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest; -import com.datatorrent.stram.plan.physical.*; +import com.datatorrent.stram.plan.physical.OperatorStatus; import com.datatorrent.stram.plan.physical.OperatorStatus.PortStatus; +import com.datatorrent.stram.plan.physical.PTContainer; +import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PTOperator.PTInput; import com.datatorrent.stram.plan.physical.PTOperator.PTOutput; import com.datatorrent.stram.plan.physical.PTOperator.State; +import com.datatorrent.stram.plan.physical.PhysicalPlan; import com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext; +import com.datatorrent.stram.plan.physical.PlanModifier; import com.datatorrent.stram.util.ConfigUtils; import com.datatorrent.stram.util.FSJsonLineFile; import com.datatorrent.stram.util.MovingAverage.MovingAverageLong; import com.datatorrent.stram.util.SharedPubSubWebSocketClient; import com.datatorrent.stram.util.WebServicesClient; -import com.datatorrent.stram.webapp.*; +import com.datatorrent.stram.webapp.ContainerInfo; +import com.datatorrent.stram.webapp.LogicalOperatorInfo; +import com.datatorrent.stram.webapp.OperatorAggregationInfo; +import com.datatorrent.stram.webapp.OperatorInfo; +import com.datatorrent.stram.webapp.PortInfo; +import com.datatorrent.stram.webapp.StreamInfo; + +import net.engio.mbassy.bus.MBassador; +import net.engio.mbassy.bus.config.BusConfiguration; /** * Tracks topology provisioning/allocation to containers<p> @@ -2246,6 +2305,24 @@ public class StreamingContainerManager implements PlanContext return fillLogicalOperatorInfo(operatorMeta); } + public ModuleMeta getModuleMeta(String moduleName) + { + return getModuleMeta(moduleName, getLogicalPlan()); + } + + private ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag) + { + for (ModuleMeta m : dag.getAllModules()) { + if (m.getFullName().equals(moduleName)) { + return m; + } + ModuleMeta res = getModuleMeta(moduleName, m.getDag()); + if (res != null) + return res; + } + return null; + } + public List<LogicalOperatorInfo> getLogicalOperatorInfoList() { List<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java index deb0967..696f497 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java @@ -2824,7 +2824,7 @@ public class DTCli } LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration()); map.put("applicationName", appFactory.getName()); - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false)); } finally { if (raw) { System.setOut(originalStream); @@ -2840,7 +2840,7 @@ public class DTCli LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration()); Map<String, Object> map = new HashMap<String, Object>(); map.put("applicationName", appFactory.getName()); - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false)); printJson(map); } else if (filename.endsWith(".properties")) { File file = new File(filename); @@ -2849,7 +2849,7 @@ public class DTCli LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration()); Map<String, Object> map = new HashMap<String, Object>(); map.put("applicationName", appFactory.getName()); - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false)); printJson(map); } else { StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom); @@ -2893,7 +2893,7 @@ public class DTCli Map<String, Object> map = new HashMap<String, Object>(); map.put("applicationName", appInfo.name); if (appInfo.dag != null) { - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag, false)); } if (appInfo.error != null) { map.put("error", appInfo.error); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java index 90dd2b5..49295d8 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java @@ -18,36 +18,53 @@ */ package com.datatorrent.stram.codec; -import com.datatorrent.api.*; -import com.datatorrent.api.Attribute; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.Operator.OutputPort; -import com.datatorrent.common.util.ObjectMapperString; -import com.datatorrent.stram.plan.logical.*; -import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; -import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; -import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; -import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; -import com.datatorrent.stram.plan.logical.Operators.PortContextPair; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import javax.ws.rs.Produces; import javax.ws.rs.ext.Provider; -import org.apache.commons.beanutils.BeanMap; -import org.apache.commons.configuration.PropertiesConfiguration; + import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.annotate.JsonTypeInfo; -import org.codehaus.jackson.map.*; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper.DefaultTypeResolverBuilder; import org.codehaus.jackson.map.ObjectMapper.DefaultTyping; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.SerializerProvider; import org.codehaus.jackson.map.jsontype.impl.StdTypeResolverBuilder; import org.codehaus.jackson.type.JavaType; -import org.codehaus.jettison.json.*; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.beanutils.BeanMap; +import org.apache.commons.configuration.PropertiesConfiguration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.Operator.OutputPort; +import com.datatorrent.common.util.ObjectMapperString; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; +import com.datatorrent.stram.plan.logical.Operators; +import com.datatorrent.stram.plan.logical.Operators.PortContextPair; + /** * <p>LogicalPlanSerializer class.</p> * @@ -88,7 +105,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> * @param dag * @return */ - public static Map<String, Object> convertToMap(LogicalPlan dag) + public static Map<String, Object> convertToMap(LogicalPlan dag, boolean includeModules) { HashMap<String, Object> result = new HashMap<String, Object>(); ArrayList<Object> operatorArray = new ArrayList< Object>(); @@ -200,6 +217,15 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> streamDetailMap.put("locality", streamMeta.getLocality().name()); } } + + if (includeModules) { + ArrayList<Map<String, Object>> modulesArray = new ArrayList<>(); + result.put("modules", modulesArray); + for(LogicalPlan.ModuleMeta meta : dag.getAllModules()) { + modulesArray.add(getLogicalModuleDetails(dag, meta)); + } + } + return result; } @@ -323,13 +349,43 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> public static JSONObject convertToJsonObject(LogicalPlan dag) { - return new JSONObject(convertToMap(dag)); + return new JSONObject(convertToMap(dag, false)); } @Override - public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException, JsonProcessingException + public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException, + JsonProcessingException { - jg.writeObject(convertToMap(dag)); + jg.writeObject(convertToMap(dag, false)); + } + + /** + * Return information about operators and inner modules of a module. + * + * @param dag top level DAG + * @param moduleMeta module information. DAG within module is used for constructing response. + * @return + */ + private static Map<String, Object> getLogicalModuleDetails(LogicalPlan dag, LogicalPlan.ModuleMeta moduleMeta) + { + Map<String, Object> moduleDetailMap = new HashMap<String, Object>(); + ArrayList<String> operatorArray = new ArrayList<>(); + moduleDetailMap.put("name", moduleMeta.getName()); + moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName()); + + moduleDetailMap.put("operators", operatorArray); + for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) { + if (operatorMeta.getModuleName() == null) { + String fullName = moduleMeta.getFullName() + LogicalPlan.MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName(); + operatorArray.add(fullName); + } + } + ArrayList<Map<String, Object>> modulesArray = new ArrayList<>(); + moduleDetailMap.put("modules", modulesArray); + for (LogicalPlan.ModuleMeta meta : moduleMeta.getDag().getAllModules()) { + modulesArray.add(getLogicalModuleDetails(dag, meta)); + } + return moduleDetailMap; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index a5b591c..47e077f 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -156,6 +156,7 @@ public class LogicalPlan implements Serializable, DAG private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); private transient int nodeIndex = 0; // used for cycle validation private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation + // streamLinks is used for connecting proxy ports to actual ports. This is not needed after dag is fully expanded. private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<String, ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>>>(); @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/52187aa2/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index 6fdba00..a9c2068 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -20,8 +20,19 @@ package com.datatorrent.stram.webapp; import java.io.IOException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -29,18 +40,16 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import org.apache.commons.beanutils.BeanMap; -import org.apache.commons.beanutils.BeanUtils; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.webapp.NotFoundException; -import org.apache.log4j.DTLoggerFactory; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.Version; @@ -54,20 +63,30 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.beanutils.BeanMap; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.log4j.DTLoggerFactory; + import com.google.common.collect.Maps; import com.google.inject.Inject; import com.google.inject.Singleton; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Operator; import com.datatorrent.api.StringCodec; - import com.datatorrent.stram.StramAppContext; import com.datatorrent.stram.StreamingContainerAgent; import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.StringCodecs; import com.datatorrent.stram.codec.LogicalPlanSerializer; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest; @@ -511,7 +530,7 @@ public class StramWebServices nodeList.operators = dagManager.getLogicalOperatorInfoList(); return new JSONObject(objectMapper.writeValueAsString(nodeList)); } - + @GET @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}") @Produces(MediaType.APPLICATION_JSON) @@ -571,7 +590,6 @@ public class StramWebServices } return response; } - @POST // not supported by WebAppProxyServlet, can only be called directly @Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties") @Consumes(MediaType.APPLICATION_JSON) @@ -633,60 +651,97 @@ public class StramWebServices public JSONObject getPorts(@PathParam("operatorName") String operatorName) { OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName); + Set<LogicalPlan.InputPortMeta> inputPorts; + Set<LogicalPlan.OutputPortMeta> outputPorts; if (logicalOperator == null) { - throw new NotFoundException(); + ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName); + if (logicalModule == null) { + throw new NotFoundException(); + } + inputPorts = logicalModule.getInputStreams().keySet(); + outputPorts = logicalModule.getOutputStreams().keySet(); + } else { + inputPorts = logicalOperator.getInputStreams().keySet(); + outputPorts = logicalOperator.getOutputStreams().keySet(); } + + JSONObject result = getPortsObjects(inputPorts, outputPorts); + return result; + } + + private JSONObject getPortsObjects(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs) + { JSONObject result = new JSONObject(); JSONArray ports = new JSONArray(); try { - for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) { + for (LogicalPlan.InputPortMeta inputPort : inputs) { JSONObject port = new JSONObject(); port.put("name", inputPort.getPortName()); port.put("type", "input"); ports.put(port); } - for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) { + for (LogicalPlan.OutputPortMeta outputPort : outputs) { JSONObject port = new JSONObject(); port.put("name", outputPort.getPortName()); port.put("type", "output"); ports.put(port); } result.put("ports", ports); - } - catch (JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } return result; } + private JSONObject getPortObject(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs, + String portName) throws JSONException + { + for (LogicalPlan.InputPortMeta inputPort : inputs) { + if (inputPort.getPortName().equals(portName)) { + JSONObject port = new JSONObject(); + port.put("name", inputPort.getPortName()); + port.put("type", "input"); + return port; + } + } + for (LogicalPlan.OutputPortMeta outputPort : outputs) { + if (outputPort.getPortName().equals(portName)) { + JSONObject port = new JSONObject(); + port.put("name", outputPort.getPortName()); + port.put("type", "output"); + return port; + } + } + return null; + } + + @GET @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}") @Produces(MediaType.APPLICATION_JSON) public JSONObject getPort(@PathParam("operatorName") String operatorName, @PathParam("portName") String portName) { OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName); + Set<LogicalPlan.InputPortMeta> inputPorts; + Set<LogicalPlan.OutputPortMeta> outputPorts; if (logicalOperator == null) { - throw new NotFoundException(); + ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName); + if (logicalModule == null) { + throw new NotFoundException(); + } + inputPorts = logicalModule.getInputStreams().keySet(); + outputPorts = logicalModule.getOutputStreams().keySet(); + } else { + inputPorts = logicalOperator.getInputStreams().keySet(); + outputPorts = logicalOperator.getOutputStreams().keySet(); } + try { - for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) { - if (portName.equals(portName)) { - JSONObject port = new JSONObject(); - port.put("name", inputPort.getPortName()); - port.put("type", "input"); - return port; - } - } - for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) { - if (portName.equals(portName)) { - JSONObject port = new JSONObject(); - port.put("name", outputPort.getPortName()); - port.put("type", "output"); - return port; - } + JSONObject resp = getPortObject(inputPorts, outputPorts, portName); + if (resp != null) { + return resp; } - } - catch (JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } throw new NotFoundException(); @@ -711,18 +766,30 @@ public class StramWebServices { init(); OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName); + BeanMap operatorProperties = null; if (logicalOperator == null) { - throw new NotFoundException(); + ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName); + if (logicalModule == null) { + throw new NotFoundException(); + } + operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule()); + } else { + operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator()); } - BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator()); + Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties); + return new JSONObject(objectMapper.writeValueAsString(m)); + } + + private Map<String, Object> getPropertiesAsMap(@QueryParam("propertyName") String propertyName, BeanMap operatorProperties) + { Map<String, Object> m = new HashMap<String, Object>(); @SuppressWarnings("rawtypes") Iterator entryIterator = operatorProperties.entryIterator(); while (entryIterator.hasNext()) { try { @SuppressWarnings("unchecked") - Map.Entry<String, Object> entry = (Map.Entry<String, Object>)entryIterator.next(); + Entry<String, Object> entry = (Entry<String, Object>)entryIterator.next(); if (propertyName == null) { m.put(entry.getKey(), entry.getValue()); } @@ -730,12 +797,11 @@ public class StramWebServices m.put(entry.getKey(), entry.getValue()); break; } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Caught exception", ex); } } - return new JSONObject(objectMapper.writeValueAsString(m)); + return m; } @GET @@ -765,10 +831,10 @@ public class StramWebServices @GET @Path(PATH_LOGICAL_PLAN) @Produces(MediaType.APPLICATION_JSON) - public JSONObject getLogicalPlan() throws JSONException, IOException + public JSONObject getLogicalPlan(@QueryParam("includeModules") String includeModules) throws JSONException, IOException { - LogicalPlan lp = dagManager.getLogicalPlan(); - return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(lp))); + return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap( + dagManager.getLogicalPlan(), includeModules != null))); } @POST // not supported by WebAppProxyServlet, can only be called directly
