APEXCORE-144, APEXCORE-145 Rest api changes to view module information
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/47b3ce8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/47b3ce8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/47b3ce8e Branch: refs/heads/master Commit: 47b3ce8e4019f325c2e91968cb120763f18bb174 Parents: c1314ea Author: shubham <[email protected]> Authored: Tue Nov 17 12:10:08 2015 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Dec 22 12:03:55 2015 +0530 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 20 +++ .../java/com/datatorrent/stram/cli/DTCli.java | 8 +- .../stram/codec/LogicalPlanSerializer.java | 47 ++++++- .../stram/plan/logical/LogicalPlan.java | 7 +- .../plan/logical/LogicalPlanConfiguration.java | 4 +- .../stram/webapp/StramWebServices.java | 123 +++++++++++++------ 6 files changed, 158 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 29c6a2c..162245b 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -95,6 +95,7 @@ import com.datatorrent.stram.engine.WindowGenerator; import com.datatorrent.stram.plan.logical.LogicalOperatorStatus; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; @@ -2246,6 +2247,25 @@ public class StreamingContainerManager implements PlanContext return fillLogicalOperatorInfo(operatorMeta); } + public ModuleMeta getModuleMeta(String moduleName) + { + return getModuleMeta(moduleName, getLogicalPlan()); + } + + private ModuleMeta getModuleMeta(String moduleName, LogicalPlan dag) + { + for (ModuleMeta m : dag.getAllModules()) { + if (m.getFullName().equals(moduleName)) { + return m; + } + ModuleMeta res = getModuleMeta(moduleName, m.getDag()); + if (res != null) { + return res; + } + } + return null; + } + public List<LogicalOperatorInfo> getLogicalOperatorInfoList() { List<LogicalOperatorInfo> infoList = new ArrayList<LogicalOperatorInfo>(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java index deb0967..696f497 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java @@ -2824,7 +2824,7 @@ public class DTCli } LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration()); map.put("applicationName", appFactory.getName()); - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false)); } finally { if (raw) { System.setOut(originalStream); @@ -2840,7 +2840,7 @@ public class DTCli LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration()); Map<String, Object> map = new HashMap<String, Object>(); map.put("applicationName", appFactory.getName()); - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false)); printJson(map); } else if (filename.endsWith(".properties")) { File file = new File(filename); @@ -2849,7 +2849,7 @@ public class DTCli LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration()); Map<String, Object> map = new HashMap<String, Object>(); map.put("applicationName", appFactory.getName()); - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false)); printJson(map); } else { StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom); @@ -2893,7 +2893,7 @@ public class DTCli Map<String, Object> map = new HashMap<String, Object>(); map.put("applicationName", appInfo.name); if (appInfo.dag != null) { - map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag)); + map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag, false)); } if (appInfo.error != null) { map.put("error", appInfo.error); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java index 90dd2b5..7b61d5b 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java @@ -88,7 +88,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> * @param dag * @return */ - public static Map<String, Object> convertToMap(LogicalPlan dag) + public static Map<String, Object> convertToMap(LogicalPlan dag, boolean includeModules) { HashMap<String, Object> result = new HashMap<String, Object>(); ArrayList<Object> operatorArray = new ArrayList< Object>(); @@ -200,6 +200,15 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> streamDetailMap.put("locality", streamMeta.getLocality().name()); } } + + if (includeModules) { + ArrayList<Map<String, Object>> modulesArray = new ArrayList<>(); + result.put("modules", modulesArray); + for(LogicalPlan.ModuleMeta meta : dag.getAllModules()) { + modulesArray.add(getLogicalModuleDetails(dag, meta)); + } + } + return result; } @@ -323,13 +332,43 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> public static JSONObject convertToJsonObject(LogicalPlan dag) { - return new JSONObject(convertToMap(dag)); + return new JSONObject(convertToMap(dag, false)); } @Override - public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException, JsonProcessingException + public void serialize(LogicalPlan dag, JsonGenerator jg, SerializerProvider sp) throws IOException, + JsonProcessingException { - jg.writeObject(convertToMap(dag)); + jg.writeObject(convertToMap(dag, false)); + } + + /** + * Return information about operators and inner modules of a module. + * + * @param dag top level DAG + * @param moduleMeta module information. DAG within module is used for constructing response. + * @return + */ + private static Map<String, Object> getLogicalModuleDetails(LogicalPlan dag, LogicalPlan.ModuleMeta moduleMeta) + { + Map<String, Object> moduleDetailMap = new HashMap<String, Object>(); + ArrayList<String> operatorArray = new ArrayList<>(); + moduleDetailMap.put("name", moduleMeta.getName()); + moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName()); + + moduleDetailMap.put("operators", operatorArray); + for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) { + if (operatorMeta.getModuleName() == null) { + String fullName = moduleMeta.getFullName() + LogicalPlan.MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName(); + operatorArray.add(fullName); + } + } + ArrayList<Map<String, Object>> modulesArray = new ArrayList<>(); + moduleDetailMap.put("modules", modulesArray); + for (LogicalPlan.ModuleMeta meta : moduleMeta.getDag().getAllModules()) { + modulesArray.add(getLogicalModuleDetails(dag, meta)); + } + return moduleDetailMap; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 21039cc..377fa6d 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -32,6 +32,7 @@ import javax.validation.constraints.NotNull; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; @@ -1090,9 +1091,9 @@ public class LogicalPlan implements Serializable, DAG } // Avoid name conflict with module. - if (modules.containsKey(name)) + if (modules.containsKey(name)) { throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); - + } OperatorMeta decl = new OperatorMeta(name, operator); rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator operators.put(name, decl); @@ -1193,7 +1194,7 @@ public class LogicalPlan implements Serializable, DAG * @param parentDAG parent dag to populate with operators from this and inner modules. * @param conf configuration object. */ - public void flattenModule(LogicalPlan parentDAG, org.apache.hadoop.conf.Configuration conf) + public void flattenModule(LogicalPlan parentDAG, Configuration conf) { module.populateDAG(dag, conf); for (ModuleMeta subModuleMeta : dag.getAllModules()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 483576a..dbd3bc3 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -18,6 +18,7 @@ */ package com.datatorrent.stram.plan.logical; + import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -31,6 +32,7 @@ import java.lang.reflect.Type; import java.util.*; import java.util.Map.Entry; + import javax.validation.ValidationException; import com.google.common.annotations.VisibleForTesting; @@ -45,7 +47,6 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.commons.beanutils.BeanMap; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.collections.CollectionUtils; @@ -60,6 +61,7 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.annotation.ApplicationAnnotation; + import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/47b3ce8e/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index 6fdba00..fd47d35 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -68,6 +68,7 @@ import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.StringCodecs; import com.datatorrent.stram.codec.LogicalPlanSerializer; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest; @@ -561,17 +562,14 @@ public class StramWebServices LOG.debug("Setting property for {}: {}={}", operatorName, key, val); dagManager.setOperatorProperty(operatorName, key, val); } - } - catch (JSONException ex) { + } catch (JSONException ex) { LOG.warn("Got JSON Exception: ", ex); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Caught exception: ", ex); throw new RuntimeException(ex); } return response; } - @POST // not supported by WebAppProxyServlet, can only be called directly @Path(PATH_PHYSICAL_PLAN_OPERATORS + "/{operatorId:\\d+}/properties") @Consumes(MediaType.APPLICATION_JSON) @@ -633,60 +631,97 @@ public class StramWebServices public JSONObject getPorts(@PathParam("operatorName") String operatorName) { OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName); + Set<LogicalPlan.InputPortMeta> inputPorts; + Set<LogicalPlan.OutputPortMeta> outputPorts; if (logicalOperator == null) { - throw new NotFoundException(); + ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName); + if (logicalModule == null) { + throw new NotFoundException(); + } + inputPorts = logicalModule.getInputStreams().keySet(); + outputPorts = logicalModule.getOutputStreams().keySet(); + } else { + inputPorts = logicalOperator.getInputStreams().keySet(); + outputPorts = logicalOperator.getOutputStreams().keySet(); } + + JSONObject result = getPortsObjects(inputPorts, outputPorts); + return result; + } + + private JSONObject getPortsObjects(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs) + { JSONObject result = new JSONObject(); JSONArray ports = new JSONArray(); try { - for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) { + for (LogicalPlan.InputPortMeta inputPort : inputs) { JSONObject port = new JSONObject(); port.put("name", inputPort.getPortName()); port.put("type", "input"); ports.put(port); } - for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) { + for (LogicalPlan.OutputPortMeta outputPort : outputs) { JSONObject port = new JSONObject(); port.put("name", outputPort.getPortName()); port.put("type", "output"); ports.put(port); } result.put("ports", ports); - } - catch (JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } return result; } + private JSONObject getPortObject(Collection<LogicalPlan.InputPortMeta> inputs, Collection<LogicalPlan.OutputPortMeta> outputs, + String portName) throws JSONException + { + for (LogicalPlan.InputPortMeta inputPort : inputs) { + if (inputPort.getPortName().equals(portName)) { + JSONObject port = new JSONObject(); + port.put("name", inputPort.getPortName()); + port.put("type", "input"); + return port; + } + } + for (LogicalPlan.OutputPortMeta outputPort : outputs) { + if (outputPort.getPortName().equals(portName)) { + JSONObject port = new JSONObject(); + port.put("name", outputPort.getPortName()); + port.put("type", "output"); + return port; + } + } + return null; + } + + @GET @Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/ports/{portName}") @Produces(MediaType.APPLICATION_JSON) public JSONObject getPort(@PathParam("operatorName") String operatorName, @PathParam("portName") String portName) { OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName); + Set<LogicalPlan.InputPortMeta> inputPorts; + Set<LogicalPlan.OutputPortMeta> outputPorts; if (logicalOperator == null) { - throw new NotFoundException(); + ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName); + if (logicalModule == null) { + throw new NotFoundException(); + } + inputPorts = logicalModule.getInputStreams().keySet(); + outputPorts = logicalModule.getOutputStreams().keySet(); + } else { + inputPorts = logicalOperator.getInputStreams().keySet(); + outputPorts = logicalOperator.getOutputStreams().keySet(); } + try { - for (LogicalPlan.InputPortMeta inputPort : logicalOperator.getInputStreams().keySet()) { - if (portName.equals(portName)) { - JSONObject port = new JSONObject(); - port.put("name", inputPort.getPortName()); - port.put("type", "input"); - return port; - } - } - for (LogicalPlan.OutputPortMeta outputPort : logicalOperator.getOutputStreams().keySet()) { - if (portName.equals(portName)) { - JSONObject port = new JSONObject(); - port.put("name", outputPort.getPortName()); - port.put("type", "output"); - return port; - } + JSONObject resp = getPortObject(inputPorts, outputPorts, portName); + if (resp != null) { + return resp; } - } - catch (JSONException ex) { + } catch (JSONException ex) { throw new RuntimeException(ex); } throw new NotFoundException(); @@ -711,31 +746,41 @@ public class StramWebServices { init(); OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName); + BeanMap operatorProperties = null; if (logicalOperator == null) { - throw new NotFoundException(); + ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName); + if (logicalModule == null) { + throw new NotFoundException(); + } + operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule()); + } else { + operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator()); } - BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator()); + Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties); + return new JSONObject(objectMapper.writeValueAsString(m)); + } + + private Map<String, Object> getPropertiesAsMap(@QueryParam("propertyName") String propertyName, BeanMap operatorProperties) + { Map<String, Object> m = new HashMap<String, Object>(); @SuppressWarnings("rawtypes") Iterator entryIterator = operatorProperties.entryIterator(); while (entryIterator.hasNext()) { try { @SuppressWarnings("unchecked") - Map.Entry<String, Object> entry = (Map.Entry<String, Object>)entryIterator.next(); + Entry<String, Object> entry = (Entry<String, Object>)entryIterator.next(); if (propertyName == null) { m.put(entry.getKey(), entry.getValue()); - } - else if (propertyName.equals(entry.getKey())) { + } else if (propertyName.equals(entry.getKey())) { m.put(entry.getKey(), entry.getValue()); break; } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Caught exception", ex); } } - return new JSONObject(objectMapper.writeValueAsString(m)); + return m; } @GET @@ -765,10 +810,10 @@ public class StramWebServices @GET @Path(PATH_LOGICAL_PLAN) @Produces(MediaType.APPLICATION_JSON) - public JSONObject getLogicalPlan() throws JSONException, IOException + public JSONObject getLogicalPlan(@QueryParam("includeModules") String includeModules) throws JSONException, IOException { - LogicalPlan lp = dagManager.getLogicalPlan(); - return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap(lp))); + return new JSONObject(objectMapper.writeValueAsString(LogicalPlanSerializer.convertToMap( + dagManager.getLogicalPlan(), includeModules != null))); } @POST // not supported by WebAppProxyServlet, can only be called directly
