DRILL-6197: Skip duplicate entry for OperatorStats org.apache.drill.exec.ops.FragmentStats should skip injecting the org.apache.drill.exec.ops.OperatorStats instance for these operators: org.apache.drill.exec.proto.beans.CoreOperatorType.SCREEN org.apache.drill.exec.proto.beans.CoreOperatorType.SINGLE_SENDER org.apache.drill.exec.proto.beans.CoreOperatorType.BROADCAST_SENDER org.apache.drill.exec.proto.beans.CoreOperatorType.HASH_PARTITION_SENDER
closes #1141 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6af651fc Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6af651fc Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6af651fc Branch: refs/heads/master Commit: 6af651fcde8059dbf557a1f2f897557425fb950d Parents: 161a046 Author: Kunal Khatua <[email protected]> Authored: Wed Feb 28 14:07:27 2018 -0800 Committer: Arina Ielchiieva <[email protected]> Committed: Sat Mar 3 19:47:48 2018 +0200 ---------------------------------------------------------------------- .../apache/drill/exec/ops/FragmentStats.java | 20 +++++++++++--------- .../drill/exec/physical/impl/BaseRootExec.java | 16 ++++++++-------- 2 files changed, 19 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6af651fc/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java index a173073..cdad6e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java @@ -17,21 +17,22 @@ */ package org.apache.drill.exec.ops; -import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; -import com.google.common.collect.Lists; - /** * Holds statistics of a particular (minor) fragment. */ public class FragmentStats { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class); - private List<OperatorStats> operators = Lists.newArrayList(); + private Map<ImmutablePair<Integer, Integer>, OperatorStats> operators = new LinkedHashMap<>(); private final long startTime; private final DrillbitEndpoint endpoint; private final BufferAllocator allocator; @@ -47,8 +48,8 @@ public class FragmentStats { prfB.setMaxMemoryUsed(allocator.getPeakMemoryAllocation()); prfB.setEndTime(System.currentTimeMillis()); prfB.setEndpoint(endpoint); - for(OperatorStats o : operators){ - prfB.addOperatorProfile(o.getProfile()); + for(Entry<ImmutablePair<Integer, Integer>, OperatorStats> o : operators.entrySet()){ + prfB.addOperatorProfile(o.getValue().getProfile()); } } @@ -62,13 +63,14 @@ public class FragmentStats { public OperatorStats newOperatorStats(final OpProfileDef profileDef, final BufferAllocator allocator) { final OperatorStats stats = new OperatorStats(profileDef, allocator); if(profileDef.operatorType != -1) { - operators.add(stats); + @SuppressWarnings("unused") + OperatorStats existingStatsHolder = addOperatorStats(stats); } return stats; } - public void addOperatorStats(OperatorStats stats) { - operators.add(stats); + public OperatorStats addOperatorStats(OperatorStats stats) { + return operators.put(new ImmutablePair<>(stats.operatorId, stats.operatorType), stats); } } http://git-wip-us.apache.org/repos/asf/drill/blob/6af651fc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 82887ec..bf52d04 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -43,20 +43,20 @@ public abstract class BaseRootExec implements RootExec { private List<CloseableRecordBatch> operators; public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { - this.oContext = fragmentContext.newOperatorContext(config, stats); - stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), - config.getOperatorType(), OperatorUtilities.getChildCount(config)), - oContext.getAllocator()); - fragmentContext.getStats().addOperatorStats(this.stats); - this.fragmentContext = fragmentContext; + this(fragmentContext, null, config); } public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException { - this.oContext = oContext; + if (oContext == null) { + this.oContext = fragmentContext.newOperatorContext(config, stats); + } else { + this.oContext = oContext; + } + //Creating new stat for appending to list stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorUtilities.getChildCount(config)), - oContext.getAllocator()); + this.oContext.getAllocator()); fragmentContext.getStats().addOperatorStats(this.stats); this.fragmentContext = fragmentContext; }
