APEXCORE-296 #comment fixed the memory leak
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d82d7669 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d82d7669 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d82d7669 Branch: refs/heads/master Commit: d82d76698873facf58567ef42b84cad75ef8e810 Parents: 59770d0 Author: Gaurav <[email protected]> Authored: Mon Jan 11 17:21:53 2016 -0800 Committer: Gaurav <[email protected]> Committed: Fri Jan 15 09:33:30 2016 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/StreamingContainerManager.java | 13 +++++++++---- .../stram/plan/physical/OperatorStatus.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d82d7669/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 162245b..4b79589 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1027,6 +1027,14 @@ public class StreamingContainerManager implements PlanContext } } o.stats.lastWindowedStats = stats; + o.stats.operatorResponses = null; + if (!o.stats.responses.isEmpty()) { + o.stats.operatorResponses = new ArrayList<>(); + StatsListener.OperatorResponse operatorResponse; + while ((operatorResponse = o.stats.responses.poll()) != null) { + o.stats.operatorResponses.add(operatorResponse); + } + } if (o.stats.lastWindowedStats != null) { // call listeners only with non empty window list if (o.statsListeners != null) { @@ -1524,10 +1532,7 @@ public class StreamingContainerManager implements PlanContext LOG.debug(" Got back the response {} for the request {}", obj, obj.getResponseId()); } else { // This is to identify user requests - if (oper.stats.operatorResponses == null) { - oper.stats.operatorResponses = new ArrayList<StatsListener.OperatorResponse>(); - } - oper.stats.operatorResponses.add(obj); + oper.stats.responses.add(obj); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d82d7669/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java index 4548173..063203e 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java @@ -34,7 +34,6 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea import com.datatorrent.stram.engine.OperatorContext; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.physical.StatsRevisions.VersionedLong; -import com.datatorrent.stram.util.MovingAverage; import com.datatorrent.stram.util.MovingAverage.MovingAverageLong; import com.datatorrent.stram.util.MovingAverage.TimedMovingAverageLong; @@ -83,6 +82,7 @@ public class OperatorStatus implements BatchedOperatorStats, java.io.Serializabl public final ConcurrentLinkedQueue<List<OperatorStats>> listenerStats = new ConcurrentLinkedQueue<List<OperatorStats>>(); public volatile long lastWindowIdChangeTms = 0; public final int windowProcessingTimeoutMillis; + public final ConcurrentLinkedQueue<StatsListener.OperatorResponse> responses = new ConcurrentLinkedQueue<>(); public List<StatsListener.OperatorResponse> operatorResponses; private final LogicalPlan.OperatorMeta operatorMeta;
