Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 f2fa255e1 -> 15b38a1f8
APEXCORE-296 #comment fixed the memory leak Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/15b38a1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/15b38a1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/15b38a1f Branch: refs/heads/release-3.2 Commit: 15b38a1f81b1740c78189981d8294e978d8e0a5e Parents: f2fa255 Author: Gaurav <gau...@datatorrent.com> Authored: Mon Jan 11 17:21:53 2016 -0800 Committer: Vlad Rozov <v.ro...@datatorrent.com> Committed: Tue May 10 17:42:26 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/stram/StreamingContainerManager.java | 13 +++++++++---- .../stram/plan/physical/OperatorStatus.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/15b38a1f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index cafd5e4..7eb4c96 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1026,6 +1026,14 @@ public class StreamingContainerManager implements PlanContext } } o.stats.lastWindowedStats = stats; + o.stats.operatorResponses = null; + if (!o.stats.responses.isEmpty()) { + o.stats.operatorResponses = new ArrayList<>(); + StatsListener.OperatorResponse operatorResponse; + while ((operatorResponse = o.stats.responses.poll()) != null) { + o.stats.operatorResponses.add(operatorResponse); + } + } if (o.stats.lastWindowedStats != null) { // call listeners only with non empty window list if (o.statsListeners != null) { @@ -1523,10 +1531,7 @@ public class StreamingContainerManager implements PlanContext LOG.debug(" Got back the response {} for the request {}", obj, obj.getResponseId()); } else { // This is to identify user requests - if (oper.stats.operatorResponses == null) { - oper.stats.operatorResponses = new ArrayList<StatsListener.OperatorResponse>(); - } - oper.stats.operatorResponses.add(obj); + oper.stats.responses.add(obj); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/15b38a1f/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java index 4548173..063203e 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java @@ -34,7 +34,6 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea import com.datatorrent.stram.engine.OperatorContext; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.physical.StatsRevisions.VersionedLong; -import com.datatorrent.stram.util.MovingAverage; import com.datatorrent.stram.util.MovingAverage.MovingAverageLong; import com.datatorrent.stram.util.MovingAverage.TimedMovingAverageLong; @@ -83,6 +82,7 @@ public class OperatorStatus implements BatchedOperatorStats, java.io.Serializabl public final ConcurrentLinkedQueue<List<OperatorStats>> listenerStats = new ConcurrentLinkedQueue<List<OperatorStats>>(); public volatile long lastWindowIdChangeTms = 0; public final int windowProcessingTimeoutMillis; + public final ConcurrentLinkedQueue<StatsListener.OperatorResponse> responses = new ConcurrentLinkedQueue<>(); public List<StatsListener.OperatorResponse> operatorResponses; private final LogicalPlan.OperatorMeta operatorMeta;