Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 79c643b95 -> e38e144f2


APEXMALHAR-2067 Necessary changes to work with Apex Core 3.4.0 due to 
ning.com:async-http-client and OperatorContext


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/188afbe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/188afbe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/188afbe8

Branch: refs/heads/master
Commit: 188afbe84982fe91252d8af3e617686ab711ae30
Parents: 79c643b
Author: David Yan <da...@datatorrent.com>
Authored: Thu Mar 17 15:15:31 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue May 10 10:58:10 2016 -0700

----------------------------------------------------------------------
 .../HBaseTransactionalPutOperatorTest.java      | 18 +++++
 library/pom.xml                                 |  5 ++
 .../lib/io/PubSubWebSocketInputOperator.java    |  3 +-
 .../lib/io/PubSubWebSocketOutputOperator.java   |  3 +-
 .../lib/io/WebSocketInputOperator.java          | 16 ++---
 .../lib/io/WebSocketOutputOperator.java         | 38 +++++------
 .../lib/io/WidgetOutputOperator.java            | 71 +++++++++++---------
 .../lib/helper/OperatorContextTestHelper.java   |  5 ++
 pom.xml                                         |  8 ++-
 9 files changed, 96 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
index e34c482..eef69d4 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java
@@ -86,6 +86,12 @@ public class HBaseTransactionalPutOperatorTest {
         public void sendMetrics(Collection<String> collection)
         {
         }
+
+        @Override
+        public int getWindowsFromCheckpoint()
+        {
+          return 0;
+        }
       });
       thop.beginWindow(0);
       thop.input.process(t1);
@@ -150,6 +156,12 @@ public class HBaseTransactionalPutOperatorTest {
         public void sendMetrics(Collection<String> collection)
         {
         }
+
+        @Override
+        public int getWindowsFromCheckpoint()
+        {
+          return 0;
+        }
       });
       thop.beginWindow(0);
       thop.input.process(t1);
@@ -218,6 +230,12 @@ public class HBaseTransactionalPutOperatorTest {
         public void sendMetrics(Collection<String> collection)
         {
         }
+
+        @Override
+        public int getWindowsFromCheckpoint()
+        {
+          return 0;
+        }
       });
 
       

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index d11d80c..cd4d2a4 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -334,6 +334,11 @@
       <artifactId>fastutil</artifactId>
       <version>7.0.6</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-shaded-ning19</artifactId>
+      <version>1.0.0</version>
+    </dependency>
 
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
index cdf2ee5..0b56924 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketInputOperator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.common.util.PubSubMessageCodec;
-import com.datatorrent.common.util.PubSubWebSocketClient;
 
 /**
  * This operator reads JSON objects from the given URL and converts them into 
maps.
@@ -82,7 +81,7 @@ public class PubSubWebSocketInputOperator<T> extends 
WebSocketInputOperator<T>
   {
     super.run();
     try {
-      
connection.sendTextMessage(PubSubWebSocketClient.constructSubscribeMessage(topic,
 codec));
+      
connection.sendMessage(PubSubMessageCodec.constructSubscribeMessage(topic, 
codec));
     }
     catch (IOException ex) {
       LOG.error("Exception caught", ex);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java
index b704191..c51c0c9 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketOutputOperator.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import javax.validation.constraints.NotNull;
 
 import com.datatorrent.common.util.PubSubMessageCodec;
-import com.datatorrent.common.util.PubSubWebSocketClient;
 
 /**
  * This operator writes maps as JSON objects to the given URL.
@@ -63,7 +62,7 @@ public class PubSubWebSocketOutputOperator<T> extends 
WebSocketOutputOperator<T>
   @Override
   public String convertMapToMessage(T t) throws IOException
   {
-    return PubSubWebSocketClient.constructPublishMessage(topic, t, codec);
+    return PubSubMessageCodec.constructPublishMessage(topic, t, codec);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index a2e7165..f805dcf 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.ClassUtils;
 
-import com.ning.http.client.AsyncHttpClient;
-import com.ning.http.client.AsyncHttpClientConfigBean;
-import com.ning.http.client.websocket.WebSocket;
-import com.ning.http.client.websocket.WebSocketTextListener;
-import com.ning.http.client.websocket.WebSocketUpgradeHandler;
+import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
+import 
org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
+import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
+import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
+import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
 
 import com.datatorrent.api.Context.OperatorContext;
 
@@ -223,12 +223,6 @@ public class WebSocketInputOperator<T> extends 
SimpleSinglePortInputOperator<T>
         }
 
         @Override
-        public void onFragment(String string, boolean bln)
-        {
-          LOG.debug("onFragment");
-        }
-
-        @Override
         public void onOpen(WebSocket ws)
         {
           LOG.debug("Connection opened");

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index 134c913..b793183 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -20,20 +20,24 @@ package com.datatorrent.lib.io;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.concurrent.*;
-
-import com.ning.http.client.AsyncHttpClient;
-import com.ning.http.client.AsyncHttpClientConfigBean;
-import com.ning.http.client.websocket.WebSocket;
-import com.ning.http.client.websocket.WebSocketTextListener;
-import com.ning.http.client.websocket.WebSocketUpgradeHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClient;
+import 
org.apache.apex.shaded.ning19.com.ning.http.client.AsyncHttpClientConfigBean;
+import org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocket;
+import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketTextListener;
+import 
org.apache.apex.shaded.ning19.com.ning.http.client.ws.WebSocketUpgradeHandler;
 import org.apache.commons.lang3.ClassUtils;
+
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -159,10 +163,9 @@ public class WebSocketOutputOperator<T> extends 
BaseOperator
             client.close();
             openConnection();
           }
-          connection.sendTextMessage(convertMapToMessage(t));
+          connection.sendMessage(convertMapToMessage(t));
           break;
-        }
-        catch (Exception ex) {
+        } catch (Exception ex) {
           if (++countTries < numRetries) {
             LOG.debug("Caught exception", ex);
             LOG.warn("Send message failed ({}). Retrying ({}).", 
ex.getMessage(), countTries);
@@ -172,12 +175,11 @@ public class WebSocketOutputOperator<T> extends 
BaseOperator
             if (waitMillisRetry > 0) {
               try {
                 Thread.sleep(waitMillisRetry);
-              }
-              catch (InterruptedException ex1) {
+              } catch (InterruptedException ex1) {
+                // continue
               }
             }
-          }
-          else {
+          } else {
             throw new RuntimeException(ex);
           }
         }
@@ -212,11 +214,6 @@ public class WebSocketOutputOperator<T> extends 
BaseOperator
       }
 
       @Override
-      public void onFragment(String string, boolean bln)
-      {
-      }
-
-      @Override
       public void onOpen(WebSocket ws)
       {
         LOG.debug("Connection opened");
@@ -242,8 +239,7 @@ public class WebSocketOutputOperator<T> extends BaseOperator
   {
     try {
       openConnection();
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       LOG.warn("Cannot establish connection:", ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
index df8b5af..5b6259c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
@@ -38,7 +38,6 @@ import 
com.datatorrent.api.annotation.InputPortFieldAnnotation;
 
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.PubSubMessageCodec;
-import com.datatorrent.common.util.PubSubWebSocketClient;
 
 /**
  * This operator outputs data in a format that can be displayed in DT UI 
widgets.
@@ -63,12 +62,13 @@ public class WidgetOutputOperator extends BaseOperator
 {
   protected transient WebSocketOutputOperator<Pair<String, Object>> wsoo = new 
WebSocketOutputOperator<Pair<String,Object>>(){
 
-    private transient PubSubMessageCodec<Object> codec = new 
PubSubMessageCodec<Object>(mapper);
+    private transient PubSubMessageCodec<Object> codec = new 
PubSubMessageCodec<>(mapper);
 
     @Override
-    public String convertMapToMessage(Pair<String,Object> t) throws 
IOException {
-      return PubSubWebSocketClient.constructPublishMessage(t.getLeft(), 
t.getRight(), codec);
-    };
+    public String convertMapToMessage(Pair<String,Object> t) throws IOException
+    {
+      return PubSubMessageCodec.constructPublishMessage(t.getLeft(), 
t.getRight(), codec);
+    }
 
   };
 
@@ -144,15 +144,14 @@ public class WidgetOutputOperator extends BaseOperator
 
   }
 
-  public static class TimeSeriesData{
-
+  public static class TimeSeriesData
+  {
     public Long time;
-
     public Number data;
-
   }
 
-  public static class TimeseriesInputPort extends 
DefaultInputPort<TimeSeriesData[]> {
+  public static class TimeseriesInputPort extends 
DefaultInputPort<TimeSeriesData[]>
+  {
 
     private final WidgetOutputOperator operator;
 
@@ -174,8 +173,8 @@ public class WidgetOutputOperator extends BaseOperator
         timeseriesMapData[i++] = timeseriesMap;
       }
 
-      if(operator.isWebSocketConnected){
-        HashMap<String, Object> schemaObj = new HashMap<String, Object>();
+      if (operator.isWebSocketConnected) {
+        HashMap<String, Object> schemaObj = new HashMap<>();
         schemaObj.put("type", "timeseries");
         schemaObj.put("minValue", operator.timeSeriesMin);
         schemaObj.put("maxValue", operator.timeSeriesMax);
@@ -185,18 +184,21 @@ public class WidgetOutputOperator extends BaseOperator
       }
     }
 
-    public TimeseriesInputPort setMax(Number max){
+    public TimeseriesInputPort setMax(Number max)
+    {
       operator.timeSeriesMax = max;
       return this;
     }
 
 
-    public TimeseriesInputPort setMin(Number min){
+    public TimeseriesInputPort setMin(Number min)
+    {
       operator.timeSeriesMin = min;
       return this;
     }
 
-    public TimeseriesInputPort setTopic(String topic){
+    public TimeseriesInputPort setTopic(String topic)
+    {
       operator.timeSeriesTopic = topic;
       return this;
     }
@@ -219,12 +221,12 @@ public class WidgetOutputOperator extends BaseOperator
       HashMap<String, Object>[] result = new HashMap[topNMap.size()];
       int j = 0;
       for (Entry<String, Number> e : topNMap.entrySet()) {
-        result[j] = new HashMap<String, Object>();
+        result[j] = new HashMap<>();
         result[j].put("name", e.getKey());
         result[j++].put("value", e.getValue());
       }
-      if(operator.isWebSocketConnected){
-        HashMap<String, Object> schemaObj = new HashMap<String, Object>();
+      if (operator.isWebSocketConnected) {
+        HashMap<String, Object> schemaObj = new HashMap<>();
         schemaObj.put("type", "topN");
         schemaObj.put("n", operator.nInTopN);
         operator.wsoo.input.process(new MutablePair<String, 
Object>(operator.getFullTopic(operator.topNTopic, schemaObj), result));
@@ -233,7 +235,8 @@ public class WidgetOutputOperator extends BaseOperator
       }
     }
 
-    public TopNInputPort setN(int n){
+    public TopNInputPort setN(int n)
+    {
       operator.nInTopN = n;
       return this;
     }
@@ -246,7 +249,8 @@ public class WidgetOutputOperator extends BaseOperator
 
   }
 
-  public static class SimpleInputPort extends DefaultInputPort<Object>{
+  public static class SimpleInputPort extends DefaultInputPort<Object>
+  {
 
     private final WidgetOutputOperator operator;
 
@@ -258,7 +262,6 @@ public class WidgetOutputOperator extends BaseOperator
     @Override
     public void process(Object tuple)
     {
-
       if (operator.isWebSocketConnected) {
         HashMap<String, Object> schemaObj = new HashMap<String, Object>();
         schemaObj.put("type", "simple");
@@ -268,7 +271,8 @@ public class WidgetOutputOperator extends BaseOperator
       }
     }
 
-    public SimpleInputPort setTopic(String topic) {
+    public SimpleInputPort setTopic(String topic)
+    {
       operator.simpleTopic = topic;
       return this;
     }
@@ -286,8 +290,8 @@ public class WidgetOutputOperator extends BaseOperator
     @Override
     public void process(Integer tuple)
     {
-      if(operator.isWebSocketConnected){
-        HashMap<String, Object> schemaObj = new HashMap<String, Object>();
+      if (operator.isWebSocketConnected) {
+        HashMap<String, Object> schemaObj = new HashMap<>();
         schemaObj.put("type", "percentage");
         operator.wsoo.input.process(new MutablePair<String, 
Object>(operator.getFullTopic(operator.percentageTopic, schemaObj), tuple));
       } else {
@@ -302,7 +306,8 @@ public class WidgetOutputOperator extends BaseOperator
     }
   }
 
-public static class PiechartInputPort extends DefaultInputPort<HashMap<String, 
Number>>{
+  public static class PiechartInputPort extends 
DefaultInputPort<HashMap<String, Number>>
+  {
 
     private final WidgetOutputOperator operator;
 
@@ -319,12 +324,12 @@ public static class PiechartInputPort extends 
DefaultInputPort<HashMap<String, N
 
       int j = 0;
       for (Entry<String, Number> e : pieNumbers.entrySet()) {
-        result[j] = new HashMap<String, Object>();
+        result[j] = new HashMap<>();
         result[j].put("label", e.getKey());
         result[j++].put("value", e.getValue());
       }
-      if(operator.isWebSocketConnected){
-        HashMap<String, Object> schemaObj = new HashMap<String, Object>();
+      if (operator.isWebSocketConnected) {
+        HashMap<String, Object> schemaObj = new HashMap<>();
         schemaObj.put("type", "piechart");
         schemaObj.put("n", operator.nInPie);
         operator.wsoo.input.process(new MutablePair<String, 
Object>(operator.getFullTopic(operator.pieChartTopic, schemaObj), result));
@@ -333,7 +338,8 @@ public static class PiechartInputPort extends 
DefaultInputPort<HashMap<String, N
       }
     }
 
-    public PiechartInputPort setN(int n){
+    public PiechartInputPort setN(int n)
+    {
       operator.nInPie = n;
       return this;
     }
@@ -346,8 +352,9 @@ public static class PiechartInputPort extends 
DefaultInputPort<HashMap<String, N
 
   }
 
-  protected String getFullTopic(String topic, Map<String, Object> schema){
-    HashMap<String, Object> topicObj = new HashMap<String, Object>();
+  protected String getFullTopic(String topic, Map<String, Object> schema)
+  {
+    HashMap<String, Object> topicObj = new HashMap<>();
     topicObj.put("appId", appId);
     topicObj.put("opId", operId);
     topicObj.put("topicName", topic);
@@ -362,7 +369,7 @@ public static class PiechartInputPort extends 
DefaultInputPort<HashMap<String, N
   @Override
   public void teardown()
   {
-    if(isWebSocketConnected){
+    if (isWebSocketConnected) {
       wsoo.teardown();
     } else {
       coo.teardown();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
 
b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
index 3b36cb7..d8138d5 100644
--- 
a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
+++ 
b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
@@ -89,6 +89,11 @@ public class OperatorContextTestHelper
       /* intentionally no-op */
     }
 
+    @Override
+    public int getWindowsFromCheckpoint()
+    {
+      return 0;
+    }
   }
 
   private static class TestContext implements Context

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/188afbe8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d219d75..8a7416b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.apex</groupId>
     <artifactId>apex</artifactId>
-    <version>3.3.0-incubating</version>
+    <version>3.4.0-incubating-SNAPSHOT</version>
   </parent>
 
   <groupId>org.apache.apex</groupId>
@@ -49,7 +49,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <maven.deploy.skip>false</maven.deploy.skip>
-    <apex.core.version>3.3.0-incubating</apex.core.version>
+    <apex.core.version>3.4.0-incubating-SNAPSHOT</apex.core.version>
     <semver.plugin.skip>false</semver.plugin.skip>
     <surefire.args>-Xmx2048m</surefire.args>
   </properties>
@@ -110,7 +110,7 @@
         <plugin>
           <groupId>com.github.siom79.japicmp</groupId>
           <artifactId>japicmp-maven-plugin</artifactId>
-          <version>0.7.0</version>
+          <version>0.7.1</version>
           <configuration>
             <oldVersion>
               <dependency>
@@ -134,6 +134,8 @@
               <excludes>
                 
<exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
                 
<exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
+                
<exclude>com.datatorrent.lib.io.WebSocketInputOperator</exclude>
+                
<exclude>com.datatorrent.lib.io.WebSocketOutputOperator</exclude>
               </excludes>
             </parameter>
             <skip>${semver.plugin.skip}</skip>

Reply via email to