APEXCORE-276 made METRICS_TRANSPORT pluggable
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/845796ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/845796ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/845796ab Branch: refs/heads/master Commit: 845796abc5f5d1f92f43b40e105f1b18cb2d8632 Parents: 56b55fe Author: David Yan <[email protected]> Authored: Tue Dec 29 13:46:58 2015 -0800 Committer: David Yan <[email protected]> Committed: Wed Dec 30 16:39:40 2015 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/api/AutoMetric.java | 19 ++++++ .../main/java/com/datatorrent/api/Context.java | 6 +- .../metric/AutoMetricBuiltInTransport.java | 67 +++++++++++++++++++ .../stram/PubSubWebSocketMetricTransport.java | 68 ++++++++++++++++++++ .../stram/StreamingAppMasterService.java | 2 +- .../stram/WebsocketAppDataPusher.java | 65 ------------------- .../datatorrent/stram/api/AppDataPusher.java | 34 ---------- .../stram/appdata/AppDataPushAgent.java | 34 +++++----- .../stram/StreamingContainerManagerTest.java | 62 ++++++++++++++---- 9 files changed, 225 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/api/src/main/java/com/datatorrent/api/AutoMetric.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/AutoMetric.java b/api/src/main/java/com/datatorrent/api/AutoMetric.java index 1c1fb25..74d062a 100644 --- a/api/src/main/java/com/datatorrent/api/AutoMetric.java +++ b/api/src/main/java/com/datatorrent/api/AutoMetric.java @@ -18,6 +18,7 @@ */ package com.datatorrent.api; +import java.io.IOException; import java.lang.annotation.*; import java.util.Collection; import java.util.Map; @@ -106,4 +107,22 @@ public @interface AutoMetric String[] getDimensionAggregationsFor(String logicalMetricName); } + /** + * Interface of transport for STRAM to push metrics data + */ + interface Transport + { + /** + * Pushes the metrics data (in JSON) to the transport. + * + * @param jsonData The metric data in JSON to be pushed to this transport + */ + void push(String jsonData) throws IOException; + + /** + * Returns the number of milliseconds for resending the metric schema. The schema will need to be resent for + * unreliable transport. Return 0 if the schema does not need to be resent. + */ + long getSchemaResendInterval(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 2054920..6092dce 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -334,10 +334,10 @@ public interface Context */ Attribute<String> APPLICATION_DATA_LINK = new Attribute<String>(new String2String()); /** - * Transport to push the stats and the metrics, "builtin:{topic}" if STRAM should push the data directly - * using websocket with the given topic + * Transport to push the stats and the metrics. + * If using the built-in transport, please use an AutoMetricBuiltInTransport object */ - Attribute<String> METRICS_TRANSPORT = new Attribute<String>(new String2String()); + Attribute<AutoMetric.Transport> METRICS_TRANSPORT = new Attribute<>(new Object2String<AutoMetric.Transport>()); /** * Application instance identifier. An application with the same name can run in multiple instances, each with a * unique identifier. The identifier is set by the client that submits the application and can be used in operators http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java new file mode 100644 index 0000000..ee9cbdd --- /dev/null +++ b/common/src/main/java/com/datatorrent/common/metric/AutoMetricBuiltInTransport.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.common.metric; + +import java.io.IOException; +import java.io.Serializable; + +import com.datatorrent.api.AutoMetric; + +/** + * AutoMetricBuiltinTransport. This will be replaced by the internal websocket pubsub transport + * provided here: {@link com.datatorrent.stram.PubSubWebSocketMetricTransport}. + */ +public class AutoMetricBuiltInTransport implements AutoMetric.Transport, Serializable +{ + private final String topic; + private final long schemaResendInterval; + private static final long DEFAULT_SCHEMA_RESEND_INTERVAL = 10000; + + public AutoMetricBuiltInTransport(String topic) + { + this.topic = topic; + this.schemaResendInterval = DEFAULT_SCHEMA_RESEND_INTERVAL; + } + + public AutoMetricBuiltInTransport(String topic, long schemaResendInterval) + { + this.topic = topic; + this.schemaResendInterval = schemaResendInterval; + } + + @Override + public void push(String jsonData) throws IOException + { + throw new UnsupportedOperationException("This class is a placeholder and is supposed to replaced by internal " + + "implementation."); + } + + @Override + public long getSchemaResendInterval() + { + return schemaResendInterval; + } + + public String getTopic() + { + return topic; + } + + private static final long serialVersionUID = 201512301009L; +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java b/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java new file mode 100644 index 0000000..85b8006 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/PubSubWebSocketMetricTransport.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram; + +import java.io.IOException; +import java.io.Serializable; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.common.util.PubSubWebSocketClient; + +/** + * <p>PubSubWebSocketMetricTransport class.</p> + * + * @since 3.0.0 + */ +public class PubSubWebSocketMetricTransport implements AutoMetric.Transport, Serializable +{ + private final String topic; + private final long schemaResendInterval; + protected PubSubWebSocketClient client; + + public PubSubWebSocketMetricTransport(PubSubWebSocketClient wsClient, String topic, long schemaResendInterval) + { + client = wsClient; + this.topic = topic; + this.schemaResendInterval = schemaResendInterval; + } + + @Override + public void push(String msg) throws IOException + { + try { + client.publish(topic, new JSONObject(msg)); + } catch (JSONException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public long getSchemaResendInterval() + { + return schemaResendInterval; + } + + private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketMetricTransport.class); + private static final long serialVersionUID = 201512301008L; +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 5d84e10..db8c255 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -536,7 +536,7 @@ public class StreamingAppMasterService extends CompositeService this.heartbeatListener = new StreamingContainerParent(this.getClass().getName(), dnmgr, delegationTokenManager, rpcListenerCount); addService(heartbeatListener); - String appDataPushTransport = dag.getValue(LogicalPlan.METRICS_TRANSPORT); + AutoMetric.Transport appDataPushTransport = dag.getValue(LogicalPlan.METRICS_TRANSPORT); if (appDataPushTransport != null) { this.appDataPushAgent = new AppDataPushAgent(dnmgr, appContext); addService(this.appDataPushAgent); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java b/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java deleted file mode 100644 index f052ded..0000000 --- a/engine/src/main/java/com/datatorrent/stram/WebsocketAppDataPusher.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.stram; - -import com.datatorrent.common.util.PubSubWebSocketClient; -import com.datatorrent.stram.api.AppDataPusher; -import java.io.IOException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * <p>WebsocketAppDataPusher class.</p> - * - * @since 3.0.0 - */ -public class WebsocketAppDataPusher implements AppDataPusher -{ - private final String topic; - private long resendSchemaInterval = 10000; // 10 seconds - protected PubSubWebSocketClient client; - - - public WebsocketAppDataPusher(PubSubWebSocketClient wsClient, String topic) - { - client = wsClient; - this.topic = topic; - } - - public void setResendSchemaInterval(long resendSchemaInterval) - { - this.resendSchemaInterval = resendSchemaInterval; - } - - @Override - public void push(JSONObject msg) throws IOException - { - client.publish(topic, msg); - } - - @Override - public long getResendSchemaInterval() - { - return resendSchemaInterval; - } - - private static final Logger LOG = LoggerFactory.getLogger(WebsocketAppDataPusher.class); - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java b/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java deleted file mode 100644 index 25cc2cf..0000000 --- a/engine/src/main/java/com/datatorrent/stram/api/AppDataPusher.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.stram.api; - -import java.io.IOException; -import org.codehaus.jettison.json.JSONObject; - -/** - * <p>AppDataPusher interface.</p> - * - * @since 3.0.0 - */ -public interface AppDataPusher -{ - public void push(JSONObject appData) throws IOException; - - public long getResendSchemaInterval(); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java index 9389e3c..7fd4c3c 100644 --- a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java @@ -41,11 +41,12 @@ import com.google.common.collect.Maps; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.StringCodec; +import com.datatorrent.common.metric.AutoMetricBuiltInTransport; import com.datatorrent.common.util.Pair; +import com.datatorrent.stram.PubSubWebSocketMetricTransport; import com.datatorrent.stram.StramAppContext; import com.datatorrent.stram.StreamingContainerManager; -import com.datatorrent.stram.WebsocketAppDataPusher; -import com.datatorrent.stram.api.AppDataPusher; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.webapp.LogicalOperatorInfo; @@ -64,7 +65,7 @@ public class AppDataPushAgent extends AbstractService private final StreamingContainerManager dnmgr; private final StramAppContext appContext; private final AppDataPushThread appDataPushThread = new AppDataPushThread(); - private AppDataPusher appDataPusher; + private AutoMetric.Transport metricsTransport; private final Map<Class<?>, List<Field>> cacheFields = new HashMap<Class<?>, List<Field>>(); private final Map<Class<?>, Map<String, Method>> cacheGetMethods = new HashMap<Class<?>, Map<String, Method>>(); @@ -81,7 +82,7 @@ public class AppDataPushAgent extends AbstractService @Override protected void serviceStop() throws Exception { - if (appDataPusher != null) { + if (metricsTransport != null) { appDataPushThread.interrupt(); try { appDataPushThread.join(); @@ -95,7 +96,7 @@ public class AppDataPushAgent extends AbstractService @Override protected void serviceStart() throws Exception { - if (appDataPusher != null) { + if (metricsTransport != null) { appDataPushThread.start(); } super.serviceStart(); @@ -110,15 +111,12 @@ public class AppDataPushAgent extends AbstractService public void init() { - String appDataPushTransport = dnmgr.getLogicalPlan().getValue(DAGContext.METRICS_TRANSPORT); - if (appDataPushTransport.startsWith(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE + ":")) { - String topic = appDataPushTransport.substring(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE.length() + 1); - appDataPusher = new WebsocketAppDataPusher(dnmgr.getWsClient(), topic); - LOG.info("App Data Push Transport set up for {}", appDataPushTransport); - } else { - // TBD add kakfa - LOG.error("App Data Push Transport not recognized: {}", appDataPushTransport); + metricsTransport = dnmgr.getLogicalPlan().getValue(DAGContext.METRICS_TRANSPORT); + if (metricsTransport instanceof AutoMetricBuiltInTransport) { + AutoMetricBuiltInTransport transport = (AutoMetricBuiltInTransport)metricsTransport; + metricsTransport = new PubSubWebSocketMetricTransport(dnmgr.getWsClient(), transport.getTopic(), transport.getSchemaResendInterval()); } + LOG.info("Metrics Transport set up for {}", metricsTransport); } private JSONObject getPushData() @@ -132,7 +130,6 @@ public class AppDataPushAgent extends AbstractService json.put("appUser", appContext.getUser()); List<LogicalOperatorInfo> logicalOperatorInfoList = dnmgr.getLogicalOperatorInfoList(); JSONObject logicalOperators = new JSONObject(); - long resendSchemaInterval = appDataPusher.getResendSchemaInterval(); for (LogicalOperatorInfo logicalOperator : logicalOperatorInfoList) { JSONObject logicalOperatorJson = extractFields(logicalOperator); JSONArray metricsList = new JSONArray(); @@ -144,8 +141,9 @@ public class AppDataPushAgent extends AbstractService // metric name, aggregated value Map<String, Object> aggregates = metrics.second; long now = System.currentTimeMillis(); - if (!operatorsSchemaLastSentTime.containsKey(logicalOperator.name) - || operatorsSchemaLastSentTime.get(logicalOperator.name) < now - resendSchemaInterval) { + if (!operatorsSchemaLastSentTime.containsKey(logicalOperator.name) || + (metricsTransport.getSchemaResendInterval() > 0 && + operatorsSchemaLastSentTime.get(logicalOperator.name) < now - metricsTransport.getSchemaResendInterval())) { try { pushMetricsSchema(dnmgr.getLogicalPlan().getOperatorMeta(logicalOperator.name), aggregates); operatorsSchemaLastSentTime.put(logicalOperator.name, now); @@ -287,12 +285,12 @@ public class AppDataPushAgent extends AbstractService schema = getMetricsSchemaData(operatorMeta, aggregates); operatorSchemas.put(operatorMeta.getName(), schema); } - appDataPusher.push(schema); + metricsTransport.push(schema.toString()); } public void pushData() throws IOException { - appDataPusher.push(getPushData()); + metricsTransport.push(getPushData().toString()); } public class AppDataPushThread extends Thread http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/845796ab/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index f1d6ec4..8fc957b 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -19,6 +19,7 @@ package com.datatorrent.stram; import java.io.IOException; +import java.io.Serializable; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.Future; @@ -33,6 +34,8 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.Locality; @@ -40,6 +43,7 @@ import com.datatorrent.api.Stats.OperatorStats; import com.datatorrent.api.Stats.OperatorStats.PortStats; import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.metric.AutoMetricBuiltInTransport; import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; @@ -75,7 +79,6 @@ import com.datatorrent.stram.support.StramTestSupport.TestMeta; import com.datatorrent.stram.tuple.Tuple; import org.apache.commons.lang.StringUtils; -import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.eclipse.jetty.websocket.WebSocket; @@ -904,7 +907,7 @@ public class StreamingContainerManagerTest public void testAppDataPush() throws Exception { final String topic = "xyz"; - final List<JSONObject> messages = new ArrayList<JSONObject>(); + final List<String> messages = new ArrayList<>(); EmbeddedWebSocketServer server = new EmbeddedWebSocketServer(0); server.setWebSocket(new WebSocket.OnTextMessage() { @@ -912,11 +915,7 @@ public class StreamingContainerManagerTest @Override public void onMessage(String data) { - try { - messages.add(new JSONObject(data)); - } catch (JSONException ex) { - throw new RuntimeException(ex); - } + messages.add(data); } @Override @@ -935,10 +934,9 @@ public class StreamingContainerManagerTest TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); - dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, "builtin:" + topic); + dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new AutoMetricBuiltInTransport(topic)); dag.setAttribute(LogicalPlan.GATEWAY_CONNECT_ADDRESS, "localhost:" + port); StramLocalCluster lc = new StramLocalCluster(dag); - //lc.runAsync(); StreamingContainerManager dnmgr = lc.dnmgr; StramAppContext appContext = new StramTestSupport.TestAppContext(); @@ -948,8 +946,7 @@ public class StreamingContainerManagerTest Thread.sleep(1000); Assert.assertTrue(messages.size() > 0); pushAgent.close(); - JSONObject message = messages.get(0); - System.out.println("Got this message: " + message.toString(2)); + JSONObject message = new JSONObject(messages.get(0)); Assert.assertEquals(topic, message.getString("topic")); Assert.assertEquals("publish", message.getString("type")); JSONObject data = message.getJSONObject("data"); @@ -970,4 +967,47 @@ public class StreamingContainerManagerTest server.stop(); } } + + public static class TestMetricTransport implements AutoMetric.Transport, Serializable + { + private String prefix; + private static List<String> messages = new ArrayList<>(); + + public TestMetricTransport(String prefix) + { + this.prefix = prefix; + } + + @Override + public void push(String jsonData) throws IOException + { + messages.add(prefix + ":" + jsonData); + } + + @Override + public long getSchemaResendInterval() + { + return 0; + } + } + + @Test + public void testCustomMetricsTransport() throws Exception + { + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + dag.addStream("o1.outport", o1.outport, o2.inport1); + dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz")); + StramLocalCluster lc = new StramLocalCluster(dag); + StreamingContainerManager dnmgr = lc.dnmgr; + StramAppContext appContext = new StramTestSupport.TestAppContext(); + + AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext); + pushAgent.init(); + pushAgent.pushData(); + Assert.assertTrue(TestMetricTransport.messages.size() > 0); + pushAgent.close(); + String msg = TestMetricTransport.messages.get(0); + Assert.assertTrue(msg.startsWith("xyz:")); + } }
