[
https://issues.apache.org/jira/browse/APEXCORE-276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15075557#comment-15075557
]
ASF GitHub Bot commented on APEXCORE-276:
-----------------------------------------
Github user tweise commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/193#discussion_r48641557
--- Diff:
engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
---
@@ -970,4 +969,48 @@ public void onClose(int closeCode, String message)
server.stop();
}
}
+
+ public static class TestMetricTransport implements AutoMetric.Transport,
Serializable
+ {
+ private String prefix;
+ private static List<String> messages = new ArrayList<>();
+
+ public TestMetricTransport(String prefix)
+ {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public void push(String jsonData) throws IOException
+ {
+ messages.add(prefix + ":" + jsonData);
+ }
+
+ @Override
+ public long getSchemaResendInterval()
+ {
+ return 0;
+ }
+ }
+
+ @Test
+ public void testCustomMetricsTransport() throws Exception
+ {
+ TestGeneratorInputOperator o1 = dag.addOperator("o1",
TestGeneratorInputOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2",
GenericTestOperator.class);
+ dag.addStream("o1.outport", o1.outport, o2.inport1);
+ dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new
TestMetricTransport("xyz"));
+ StramLocalCluster lc = new StramLocalCluster(dag);
+ StreamingContainerManager dnmgr = lc.dnmgr;
+ StramAppContext appContext = new StramTestSupport.TestAppContext();
+
+ AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
+ pushAgent.init();
+ pushAgent.pushData();
+ Assert.assertTrue(TestMetricTransport.messages.size() > 0);
+ pushAgent.close();
+ String msg = TestMetricTransport.messages.get(0);
+ System.out.println("Got this message: " + msg);
--- End diff --
Not needed.
> Make App Data Push transport pluggable and configurable
> -------------------------------------------------------
>
> Key: APEXCORE-276
> URL: https://issues.apache.org/jira/browse/APEXCORE-276
> Project: Apache Apex Core
> Issue Type: New Feature
> Reporter: David Yan
> Assignee: David Yan
>
> Currently it's not possible without changing the code to have your own
> transport.
> Code from AppDataPushAgent.java:
> {code}
> public void init()
> {
> String appDataPushTransport =
> dnmgr.getLogicalPlan().getValue(DAGContext.METRICS_TRANSPORT);
> if (appDataPushTransport.startsWith(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE
> + ":")) {
> String topic =
> appDataPushTransport.substring(APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE.length()
> + 1);
> appDataPusher = new WebsocketAppDataPusher(dnmgr.getWsClient(), topic);
> LOG.info("App Data Push Transport set up for {}", appDataPushTransport);
> } else {
> // TBD add kakfa
> LOG.error("App Data Push Transport not recognized: {}",
> appDataPushTransport);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)