APEXCORE-296 #comment fixed the memory leak

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d82d7669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d82d7669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d82d7669

Branch: refs/heads/master
Commit: d82d76698873facf58567ef42b84cad75ef8e810
Parents: 59770d0
Author: Gaurav <[email protected]>
Authored: Mon Jan 11 17:21:53 2016 -0800
Committer: Gaurav <[email protected]>
Committed: Fri Jan 15 09:33:30 2016 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/StreamingContainerManager.java   | 13 +++++++++----
 .../stram/plan/physical/OperatorStatus.java            |  2 +-
 2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d82d7669/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 162245b..4b79589 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1027,6 +1027,14 @@ public class StreamingContainerManager implements 
PlanContext
         }
       }
       o.stats.lastWindowedStats = stats;
+      o.stats.operatorResponses = null;
+      if (!o.stats.responses.isEmpty()) {
+        o.stats.operatorResponses = new ArrayList<>();
+        StatsListener.OperatorResponse operatorResponse;
+        while ((operatorResponse = o.stats.responses.poll()) != null) {
+          o.stats.operatorResponses.add(operatorResponse);
+        }
+      }
       if (o.stats.lastWindowedStats != null) {
         // call listeners only with non empty window list
         if (o.statsListeners != null) {
@@ -1524,10 +1532,7 @@ public class StreamingContainerManager implements 
PlanContext
             LOG.debug(" Got back the response {} for the request {}", obj, 
obj.getResponseId());
           }
           else {       // This is to identify user requests
-            if (oper.stats.operatorResponses == null) {
-              oper.stats.operatorResponses = new 
ArrayList<StatsListener.OperatorResponse>();
-            }
-            oper.stats.operatorResponses.add(obj);
+            oper.stats.responses.add(obj);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d82d7669/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
index 4548173..063203e 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
@@ -34,7 +34,6 @@ import 
com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
 import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.physical.StatsRevisions.VersionedLong;
-import com.datatorrent.stram.util.MovingAverage;
 import com.datatorrent.stram.util.MovingAverage.MovingAverageLong;
 import com.datatorrent.stram.util.MovingAverage.TimedMovingAverageLong;
 
@@ -83,6 +82,7 @@ public class OperatorStatus implements BatchedOperatorStats, 
java.io.Serializabl
   public final ConcurrentLinkedQueue<List<OperatorStats>> listenerStats = new 
ConcurrentLinkedQueue<List<OperatorStats>>();
   public volatile long lastWindowIdChangeTms = 0;
   public final int windowProcessingTimeoutMillis;
+  public final ConcurrentLinkedQueue<StatsListener.OperatorResponse> responses 
= new ConcurrentLinkedQueue<>();
   public List<StatsListener.OperatorResponse> operatorResponses;
 
   private final LogicalPlan.OperatorMeta operatorMeta;

Reply via email to