Repository: incubator-apex-malhar Updated Branches: refs/heads/master 79c643b95 -> e38e144f2
APEXMALHAR-2067 Necessary changes to work with Apex Core 3.4.0 due to ning.com:async-http-client and OperatorContext Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/188afbe8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/188afbe8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/188afbe8 Branch: refs/heads/master Commit: 188afbe84982fe91252d8af3e617686ab711ae30 Parents: 79c643b Author: David Yan <da...@datatorrent.com> Authored: Thu Mar 17 15:15:31 2016 -0700 Committer: David Yan <da...@datatorrent.com> Committed: Tue May 10 10:58:10 2016 -0700 ---------------------------------------------------------------------- .../HBaseTransactionalPutOperatorTest.java | 18 +++++ library/pom.xml | 5 ++ .../lib/io/PubSubWebSocketInputOperator.java | 3 +- .../lib/io/PubSubWebSocketOutputOperator.java | 3 +- .../lib/io/WebSocketInputOperator.java | 16 ++--- .../lib/io/WebSocketOutputOperator.java | 38 +++++------ .../lib/io/WidgetOutputOperator.java | 71 +++++++++++--------- .../lib/helper/OperatorContextTestHelper.java | 5 ++ pom.xml | 8 ++- 9 files changed, 96 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java index e34c482..eef69d4 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java @@ -86,6 +86,12 @@ public class HBaseTransactionalPutOperatorTest { public void sendMetrics(Collection<String> collection) { } + + @Override + public int getWindowsFromCheckpoint() + { + return 0; + } }); thop.beginWindow(0); thop.input.process(t1); @@ -150,6 +156,12 @@ public class HBaseTransactionalPutOperatorTest { public void sendMetrics(Collection<String> collection) { } + + @Override + public int getWindowsFromCheckpoint() + { + return 0; + } }); thop.beginWindow(0); thop.input.process(t1); @@ -218,6 +230,12 @@ public class HBaseTransactionalPutOperatorTest { public void sendMetrics(Collection<String> collection) { } + + @Override + public int getWindowsFromCheckpoint() + { + return 0; + } }); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index d11d80c..cd4d2a4 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -334,6 +334,11 @@ <artifactId>fastutil</artifactId> <version>7.0.6</version> </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-shaded-ning19</artifactId> + <version>1.0.0</version> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java index cdf2ee5..0b56924 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.common.util.PubSubMessageCodec; -import com.datatorrent.common.util.PubSubWebSocketClient; /** * This operator reads JSON objects from the given URL and converts them into maps. @@ -82,7 +81,7 @@ public class PubSubWebSocketInputOperator<T> extends WebSocketInputOperator<T> { super.run(); try { - connection.sendTextMessage(PubSubWebSocketClient.constructSubscribeMessage(topic, codec)); + connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage(topic, codec)); } catch (IOException ex) { LOG.error("Exception caught", ex); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java index b704191..c51c0c9 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java @@ -23,7 +23,6 @@ import java.io.IOException; import javax.validation.constraints.NotNull; import com.datatorrent.common.util.PubSubMessageCodec; -import com.datatorrent.common.util.PubSubWebSocketClient; /** * This operator writes maps as JSON objects to the given URL. @@ -63,7 +62,7 @@ public class PubSubWebSocketOutputOperator<T> extends WebSocketOutputOperator<T> @Override public String convertMapToMessage(T t) throws IOException { - return PubSubWebSocketClient.constructPublishMessage(topic, t, codec); + return PubSubMessageCodec.constructPublishMessage(topic, t, codec); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index a2e7165..f805dcf 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang3.ClassUtils; -import com.ning.http.client.AsyncHttpClient; -import com.ning.http.client.AsyncHttpClientConfigBean; -import com.ning.http.client.websocket.WebSocket; -import com.ning.http.client.websocket.WebSocketTextListener; -import com.ning.http.client.websocket.WebSocketUpgradeHandler; +import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient; +import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean; +import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket; +import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener; +import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler; import com.datatorrent.api.Context.OperatorContext; @@ -223,12 +223,6 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> } @Override - public void onFragment(String string, boolean bln) - { - LOG.debug("onFragment"); - } - - @Override public void onOpen(WebSocket ws) { LOG.debug("Connection opened"); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java index 134c913..b793183 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java @@ -20,20 +20,24 @@ package com.datatorrent.lib.io; import java.io.IOException; import java.net.URI; -import java.util.concurrent.*; - -import com.ning.http.client.AsyncHttpClient; -import com.ning.http.client.AsyncHttpClientConfigBean; -import com.ning.http.client.websocket.WebSocket; -import com.ning.http.client.websocket.WebSocketTextListener; -import com.ning.http.client.websocket.WebSocketUpgradeHandler; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient; +import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean; +import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket; +import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener; +import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler; import org.apache.commons.lang3.ClassUtils; + import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; @@ -159,10 +163,9 @@ public class WebSocketOutputOperator<T> extends BaseOperator client.close(); openConnection(); } - connection.sendTextMessage(convertMapToMessage(t)); + connection.sendMessage(convertMapToMessage(t)); break; - } - catch (Exception ex) { + } catch (Exception ex) { if (++countTries < numRetries) { LOG.debug("Caught exception", ex); LOG.warn("Send message failed ({}). Retrying ({}).", ex.getMessage(), countTries); @@ -172,12 +175,11 @@ public class WebSocketOutputOperator<T> extends BaseOperator if (waitMillisRetry > 0) { try { Thread.sleep(waitMillisRetry); - } - catch (InterruptedException ex1) { + } catch (InterruptedException ex1) { + // continue } } - } - else { + } else { throw new RuntimeException(ex); } } @@ -212,11 +214,6 @@ public class WebSocketOutputOperator<T> extends BaseOperator } @Override - public void onFragment(String string, boolean bln) - { - } - - @Override public void onOpen(WebSocket ws) { LOG.debug("Connection opened"); @@ -242,8 +239,7 @@ public class WebSocketOutputOperator<T> extends BaseOperator { try { openConnection(); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Cannot establish connection:", ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java index df8b5af..5b6259c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java @@ -38,7 +38,6 @@ import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.PubSubMessageCodec; -import com.datatorrent.common.util.PubSubWebSocketClient; /** * This operator outputs data in a format that can be displayed in DT UI widgets. @@ -63,12 +62,13 @@ public class WidgetOutputOperator extends BaseOperator { protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new WebSocketOutputOperator<Pair<String,Object>>(){ - private transient PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper); + private transient PubSubMessageCodec<Object> codec = new PubSubMessageCodec<>(mapper); @Override - public String convertMapToMessage(Pair<String,Object> t) throws IOException { - return PubSubWebSocketClient.constructPublishMessage(t.getLeft(), t.getRight(), codec); - }; + public String convertMapToMessage(Pair<String,Object> t) throws IOException + { + return PubSubMessageCodec.constructPublishMessage(t.getLeft(), t.getRight(), codec); + } }; @@ -144,15 +144,14 @@ public class WidgetOutputOperator extends BaseOperator } - public static class TimeSeriesData{ - + public static class TimeSeriesData + { public Long time; - public Number data; - } - public static class TimeseriesInputPort extends DefaultInputPort<TimeSeriesData[]> { + public static class TimeseriesInputPort extends DefaultInputPort<TimeSeriesData[]> + { private final WidgetOutputOperator operator; @@ -174,8 +173,8 @@ public class WidgetOutputOperator extends BaseOperator timeseriesMapData[i++] = timeseriesMap; } - if(operator.isWebSocketConnected){ - HashMap<String, Object> schemaObj = new HashMap<String, Object>(); + if (operator.isWebSocketConnected) { + HashMap<String, Object> schemaObj = new HashMap<>(); schemaObj.put("type", "timeseries"); schemaObj.put("minValue", operator.timeSeriesMin); schemaObj.put("maxValue", operator.timeSeriesMax); @@ -185,18 +184,21 @@ public class WidgetOutputOperator extends BaseOperator } } - public TimeseriesInputPort setMax(Number max){ + public TimeseriesInputPort setMax(Number max) + { operator.timeSeriesMax = max; return this; } - public TimeseriesInputPort setMin(Number min){ + public TimeseriesInputPort setMin(Number min) + { operator.timeSeriesMin = min; return this; } - public TimeseriesInputPort setTopic(String topic){ + public TimeseriesInputPort setTopic(String topic) + { operator.timeSeriesTopic = topic; return this; } @@ -219,12 +221,12 @@ public class WidgetOutputOperator extends BaseOperator HashMap<String, Object>[] result = new HashMap[topNMap.size()]; int j = 0; for (Entry<String, Number> e : topNMap.entrySet()) { - result[j] = new HashMap<String, Object>(); + result[j] = new HashMap<>(); result[j].put("name", e.getKey()); result[j++].put("value", e.getValue()); } - if(operator.isWebSocketConnected){ - HashMap<String, Object> schemaObj = new HashMap<String, Object>(); + if (operator.isWebSocketConnected) { + HashMap<String, Object> schemaObj = new HashMap<>(); schemaObj.put("type", "topN"); schemaObj.put("n", operator.nInTopN); operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.topNTopic, schemaObj), result)); @@ -233,7 +235,8 @@ public class WidgetOutputOperator extends BaseOperator } } - public TopNInputPort setN(int n){ + public TopNInputPort setN(int n) + { operator.nInTopN = n; return this; } @@ -246,7 +249,8 @@ public class WidgetOutputOperator extends BaseOperator } - public static class SimpleInputPort extends DefaultInputPort<Object>{ + public static class SimpleInputPort extends DefaultInputPort<Object> + { private final WidgetOutputOperator operator; @@ -258,7 +262,6 @@ public class WidgetOutputOperator extends BaseOperator @Override public void process(Object tuple) { - if (operator.isWebSocketConnected) { HashMap<String, Object> schemaObj = new HashMap<String, Object>(); schemaObj.put("type", "simple"); @@ -268,7 +271,8 @@ public class WidgetOutputOperator extends BaseOperator } } - public SimpleInputPort setTopic(String topic) { + public SimpleInputPort setTopic(String topic) + { operator.simpleTopic = topic; return this; } @@ -286,8 +290,8 @@ public class WidgetOutputOperator extends BaseOperator @Override public void process(Integer tuple) { - if(operator.isWebSocketConnected){ - HashMap<String, Object> schemaObj = new HashMap<String, Object>(); + if (operator.isWebSocketConnected) { + HashMap<String, Object> schemaObj = new HashMap<>(); schemaObj.put("type", "percentage"); operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.percentageTopic, schemaObj), tuple)); } else { @@ -302,7 +306,8 @@ public class WidgetOutputOperator extends BaseOperator } } -public static class PiechartInputPort extends DefaultInputPort<HashMap<String, Number>>{ + public static class PiechartInputPort extends DefaultInputPort<HashMap<String, Number>> + { private final WidgetOutputOperator operator; @@ -319,12 +324,12 @@ public static class PiechartInputPort extends DefaultInputPort<HashMap<String, N int j = 0; for (Entry<String, Number> e : pieNumbers.entrySet()) { - result[j] = new HashMap<String, Object>(); + result[j] = new HashMap<>(); result[j].put("label", e.getKey()); result[j++].put("value", e.getValue()); } - if(operator.isWebSocketConnected){ - HashMap<String, Object> schemaObj = new HashMap<String, Object>(); + if (operator.isWebSocketConnected) { + HashMap<String, Object> schemaObj = new HashMap<>(); schemaObj.put("type", "piechart"); schemaObj.put("n", operator.nInPie); operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.pieChartTopic, schemaObj), result)); @@ -333,7 +338,8 @@ public static class PiechartInputPort extends DefaultInputPort<HashMap<String, N } } - public PiechartInputPort setN(int n){ + public PiechartInputPort setN(int n) + { operator.nInPie = n; return this; } @@ -346,8 +352,9 @@ public static class PiechartInputPort extends DefaultInputPort<HashMap<String, N } - protected String getFullTopic(String topic, Map<String, Object> schema){ - HashMap<String, Object> topicObj = new HashMap<String, Object>(); + protected String getFullTopic(String topic, Map<String, Object> schema) + { + HashMap<String, Object> topicObj = new HashMap<>(); topicObj.put("appId", appId); topicObj.put("opId", operId); topicObj.put("topicName", topic); @@ -362,7 +369,7 @@ public static class PiechartInputPort extends DefaultInputPort<HashMap<String, N @Override public void teardown() { - if(isWebSocketConnected){ + if (isWebSocketConnected) { wsoo.teardown(); } else { coo.teardown(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java index 3b36cb7..d8138d5 100644 --- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java +++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java @@ -89,6 +89,11 @@ public class OperatorContextTestHelper /* intentionally no-op */ } + @Override + public int getWindowsFromCheckpoint() + { + return 0; + } } private static class TestContext implements Context http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d219d75..8a7416b 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.apex</groupId> <artifactId>apex</artifactId> - <version>3.3.0-incubating</version> + <version>3.4.0-incubating-SNAPSHOT</version> </parent> <groupId>org.apache.apex</groupId> @@ -49,7 +49,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.deploy.skip>false</maven.deploy.skip> - <apex.core.version>3.3.0-incubating</apex.core.version> + <apex.core.version>3.4.0-incubating-SNAPSHOT</apex.core.version> <semver.plugin.skip>false</semver.plugin.skip> <surefire.args>-Xmx2048m</surefire.args> </properties> @@ -110,7 +110,7 @@ <plugin> <groupId>com.github.siom79.japicmp</groupId> <artifactId>japicmp-maven-plugin</artifactId> - <version>0.7.0</version> + <version>0.7.1</version> <configuration> <oldVersion> <dependency> @@ -134,6 +134,8 @@ <excludes> <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude> <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude> + <exclude>com.datatorrent.lib.io.WebSocketInputOperator</exclude> + <exclude>com.datatorrent.lib.io.WebSocketOutputOperator</exclude> </excludes> </parameter> <skip>${semver.plugin.skip}</skip>