Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 c7d27d643 -> fafc4242f
APEXCORE-349 return strings for attribute values in REST service Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fafc4242 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fafc4242 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fafc4242 Branch: refs/heads/release-3.3 Commit: fafc4242fd94714aed4857b552f530cb5f1b84e8 Parents: c7d27d6 Author: David Yan <[email protected]> Authored: Wed Feb 17 13:43:35 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Fri Feb 19 17:07:36 2016 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/api/StringCodec.java | 8 +++++++ .../metric/AutoMetricBuiltInTransport.java | 6 +++++ .../stram/StreamingContainerManager.java | 23 +++++++++----------- .../stram/plan/logical/LogicalPlan.java | 2 +- .../com/datatorrent/stram/webapp/AppInfo.java | 6 ++--- .../stram/webapp/StramWebServices.java | 23 ++++++++++++++------ .../stram/StreamingContainerManagerTest.java | 4 ++-- .../stram/support/StramTestSupport.java | 10 ++++----- .../stram/webapp/StramWebServicesTest.java | 19 +++++++++++++++- 9 files changed, 68 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/api/src/main/java/com/datatorrent/api/StringCodec.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java index 72d5f34..8d39320 100644 --- a/api/src/main/java/com/datatorrent/api/StringCodec.java +++ b/api/src/main/java/com/datatorrent/api/StringCodec.java @@ -134,6 +134,11 @@ public interface StringCodec<T> * string as an argument.If properties are specified then properties will be set on the object. The properties * are defined in property=value format separated by colon(:) * + * Note that the {@link #toString(Object) toString} method is by default NOT the proper reverse of the {@link + * #fromString(String) fromString} method. In order for the {@link #toString(Object) toString} method to become a + * proper reverse of the {@link #fromString(String) fromString} method, T's {@link T#toString() toString} method + * must output null or <Constructor_String> or the <Constructor_String>:<Property_String> format as stated above. + * * @param <T> Type of the object which is converted to/from String */ public class Object2String<T> implements StringCodec<T>, Serializable @@ -195,6 +200,9 @@ public interface StringCodec<T> @Override public String toString(T pojo) { + if (pojo == null) { + return null; + } String arg = pojo.toString(); if (arg == null) { return pojo.getClass().getCanonicalName(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java index 1a71ee1..ebd1e7f 100644 --- a/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java +++ b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java @@ -48,6 +48,12 @@ public class AutoMetricBuiltInTransport implements AutoMetric.Transport, Seriali } @Override + public String toString() + { + return this.topic; + } + + @Override public void push(String jsonData) throws IOException { throw new UnsupportedOperationException("This class is a placeholder and is supposed to replaced by internal " + http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index df3bfc4..0e7091b 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2837,7 +2837,7 @@ public class StreamingContainerManager implements PlanContext } } - public Map<String, Object> getPortAttributes(String operatorId, String portName) + public Attribute.AttributeMap getPortAttributes(String operatorId, String portName) { OperatorMeta logicalOperator = plan.getLogicalPlan().getOperatorMeta(operatorId); if (logicalOperator == null) { @@ -2848,24 +2848,21 @@ public class StreamingContainerManager implements PlanContext Operators.describe(logicalOperator.getOperator(), portMap); PortContextPair<InputPort<?>> inputPort = portMap.inputPorts.get(portName); if (inputPort != null) { - HashMap<String, Object> portAttributeMap = new HashMap<String, Object>(); InputPortMeta portMeta = logicalOperator.getMeta(inputPort.component); - Map<Attribute<Object>, Object> rawAttributes = Attribute.AttributeMap.AttributeInitializer.getAllAttributes(portMeta, Context.PortContext.class); - for (Map.Entry<Attribute<Object>, Object> attEntry : rawAttributes.entrySet()) { - portAttributeMap.put(attEntry.getKey().getSimpleName(), attEntry.getValue()); + try { + return portMeta.getAttributes().clone(); + } catch (CloneNotSupportedException ex) { + throw new RuntimeException("Cannot clone port attributes", ex); } - return portAttributeMap; - } - else { + } else { PortContextPair<OutputPort<?>> outputPort = portMap.outputPorts.get(portName); if (outputPort != null) { - HashMap<String, Object> portAttributeMap = new HashMap<String, Object>(); OutputPortMeta portMeta = logicalOperator.getMeta(outputPort.component); - Map<Attribute<Object>, Object> rawAttributes = Attribute.AttributeMap.AttributeInitializer.getAllAttributes(portMeta, Context.PortContext.class); - for (Map.Entry<Attribute<Object>, Object> attEntry : rawAttributes.entrySet()) { - portAttributeMap.put(attEntry.getKey().getSimpleName(), attEntry.getValue()); + try { + return portMeta.getAttributes().clone(); + } catch (CloneNotSupportedException ex) { + throw new RuntimeException("Cannot clone port attributes", ex); } - return portAttributeMap; } throw new IllegalArgumentException("Invalid port name " + portName); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 6d7ebe1..8fa71bb 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1923,7 +1923,7 @@ public class LogicalPlan implements Serializable, DAG * @see <a href="http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm">http://en.wikipedia.org/wiki/Tarjan%E2%80%99s_strongly_connected_components_algorithm</a> * * @param om - * @param cycles + * @param ctx */ public void findStronglyConnected(OperatorMeta om, ValidationContext ctx) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java b/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java index 3e6a4af..109a401 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/AppInfo.java @@ -60,7 +60,7 @@ public class AppInfo { protected boolean gatewayConnected; protected List<AppDataSource> appDataSources; protected Map<String, Object> metrics; - public Map<String, Object> attributes; + public Map<String, String> attributes; public String appMasterTrackingUrl; public String version; public AppStats stats; @@ -215,9 +215,9 @@ public class AppInfo { this.stats = context.getStats(); this.gatewayAddress = context.getGatewayAddress(); this.version = VersionInfo.getBuildVersion(); - this.attributes = new TreeMap<String, Object>(); + this.attributes = new TreeMap<>(); for (Map.Entry<Attribute<Object>, Object> entry : AttributeMap.AttributeInitializer.getAllAttributes(context, DAGContext.class).entrySet()) { - this.attributes.put(entry.getKey().getSimpleName(), entry.getValue()); + this.attributes.put(entry.getKey().getSimpleName(), entry.getKey().codec.toString(entry.getValue())); } this.gatewayConnected = context.isGatewayConnected(); this.appDataSources = context.getAppDataSources(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index fd47d35..f735253 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -602,10 +602,11 @@ public class StramWebServices if (logicalOperator == null) { throw new NotFoundException(); } - HashMap<String, Object> map = new HashMap<String, Object>(); + HashMap<String, String> map = new HashMap<>(); for (Entry<Attribute<?>, Object> entry : dagManager.getOperatorAttributes(operatorName).entrySet()) { - if (attributeName == null || entry.getKey().name.equals(attributeName)) { - map.put(entry.getKey().name, entry.getValue()); + if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName)) { + Entry<Attribute<Object>, Object> entry1 = (Entry<Attribute<Object>, Object>)(Entry)entry; + map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue())); } } return new JSONObject(map); @@ -616,10 +617,11 @@ public class StramWebServices @Produces(MediaType.APPLICATION_JSON) public JSONObject getApplicationAttributes(@QueryParam("attributeName") String attributeName) { - HashMap<String, Object> map = new HashMap<String, Object>(); + HashMap<String, String> map = new HashMap<>(); for (Entry<Attribute<?>, Object> entry : dagManager.getApplicationAttributes().entrySet()) { - if (attributeName == null || entry.getKey().name.equals(attributeName)) { - map.put(entry.getKey().name, entry.getValue()); + if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName)) { + Entry<Attribute<Object>, Object> entry1 = (Entry<Attribute<Object>, Object>)(Entry)entry; + map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue())); } } return new JSONObject(map); @@ -736,7 +738,14 @@ public class StramWebServices if (logicalOperator == null) { throw new NotFoundException(); } - return new JSONObject(dagManager.getPortAttributes(operatorName, portName)); + HashMap<String, String> map = new HashMap<>(); + for (Entry<Attribute<?>, Object> entry : dagManager.getPortAttributes(operatorName, portName).entrySet()) { + if (attributeName == null || entry.getKey().getSimpleName().equals(attributeName)) { + Entry<Attribute<Object>, Object> entry1 = (Entry<Attribute<Object>, Object>)(Entry)entry; + map.put(entry1.getKey().getSimpleName(), entry1.getKey().codec.toString(entry1.getValue())); + } + } + return new JSONObject(map); } @GET http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 8fc957b..5edc582 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -938,7 +938,7 @@ public class StreamingContainerManagerTest dag.setAttribute(LogicalPlan.GATEWAY_CONNECT_ADDRESS, "localhost:" + port); StramLocalCluster lc = new StramLocalCluster(dag); StreamingContainerManager dnmgr = lc.dnmgr; - StramAppContext appContext = new StramTestSupport.TestAppContext(); + StramAppContext appContext = new StramTestSupport.TestAppContext(dag.getAttributes()); AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext); pushAgent.init(); @@ -1000,7 +1000,7 @@ public class StreamingContainerManagerTest dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz")); StramLocalCluster lc = new StramLocalCluster(dag); StreamingContainerManager dnmgr = lc.dnmgr; - StramAppContext appContext = new StramTestSupport.TestAppContext(); + StramAppContext appContext = new StramTestSupport.TestAppContext(dag.getAttributes()); AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext); pushAgent.init(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java index cf2a887..ad66ede 100644 --- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java +++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java @@ -130,9 +130,7 @@ abstract public class StramTestSupport /** * Create an appPackage zip using the sample appPackage located in * src/test/resources/testAppPackage/testAppPackageSrc. - * @param file The file whose path will be used to create the appPackage zip * @return The File object that can be used in the AppPackage constructor. - * @throws net.lingala.zip4j.exception.ZipException */ public static File createAppPackageFile() { @@ -479,16 +477,16 @@ abstract public class StramTestSupport final long startTime = System.currentTimeMillis(); final String gatewayAddress = "localhost:9090"; - public TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) + public TestAppContext(Attribute.AttributeMap attributeMap, int appid, int numJobs, int numTasks, int numAttempts) { - super(new Attribute.AttributeMap.DefaultAttributeMap(), null); // this needs to be done in a proper way - may cause application errors. + super(attributeMap, null); // this needs to be done in a proper way - may cause application errors. this.appID = ApplicationId.newInstance(0, appid); this.appAttemptID = ApplicationAttemptId.newInstance(this.appID, numAttempts); } - public TestAppContext() + public TestAppContext(Attribute.AttributeMap attributeMap) { - this(0, 1, 1, 1); + this(attributeMap, 0, 1, 1, 1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fafc4242/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java index d44be87..c083891 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java @@ -46,6 +46,7 @@ import com.sun.jersey.test.framework.WebAppDescriptor; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -60,6 +61,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import com.datatorrent.api.Context; +import com.datatorrent.common.metric.AutoMetricBuiltInTransport; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramAppContext; import com.datatorrent.stram.StreamingContainerManager; @@ -133,9 +135,10 @@ public class StramWebServicesTest extends JerseyTest String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); + dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new AutoMetricBuiltInTransport("xyz")); final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag); - appContext = new TestAppContext(); + appContext = new TestAppContext(dag.getAttributes()); bind(JAXBContextResolver.class); bind(StramWebServices.class); bind(GenericExceptionHandler.class); @@ -332,6 +335,20 @@ public class StramWebServicesTest extends JerseyTest } } + + @Test + public void testAttributes() throws Exception + { + WebResource r = resource(); + ClientResponse response = r.path(StramWebServices.PATH + "/") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + JSONObject attrs = json.getJSONObject("attributes"); + Assert.assertEquals(AutoMetricBuiltInTransport.class.getName() + ":xyz", + attrs.getString(Context.DAGContext.METRICS_TRANSPORT.getSimpleName())); + } + @Test public void testSubmitLogicalPlanChange() throws JSONException, Exception {
