Add BaseRootExec which will enable collection of stats for Senders Add SenderStats that collects stats specific to senders.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9b22d2c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9b22d2c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9b22d2c3 Branch: refs/heads/master Commit: 9b22d2c37d638451149270a19fe1b2e63d8ea670 Parents: 3e98ffc Author: Mehant Baid <[email protected]> Authored: Fri May 23 10:29:48 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Jun 8 19:13:06 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/ops/FragmentStats.java | 4 ++ .../org/apache/drill/exec/ops/OpProfileDef.java | 5 ++ .../apache/drill/exec/ops/OperatorContext.java | 13 ++-- .../apache/drill/exec/ops/OperatorStats.java | 32 ++++++--- .../org/apache/drill/exec/ops/SenderStats.java | 73 ++++++++++++++++++++ .../drill/exec/physical/impl/BaseRootExec.java | 21 +++--- .../PartitionSenderRootExec.java | 12 +++- .../partitionsender/PartitionSenderStats.java | 5 +- .../partitionsender/PartitionStatsBatch.java | 24 +++++++ .../impl/partitionsender/Partitioner.java | 3 + .../partitionsender/PartitionerTemplate.java | 15 +++- 11 files changed, 175 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java index d667794..19ac0aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java @@ -54,4 +54,8 @@ public class FragmentStats { return stats; } + public void addOperatorStats(OperatorStats stats) { + operators.add(stats); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java index fb68e4a..b5c8d86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java @@ -23,6 +23,11 @@ public class OpProfileDef { public int operatorType; public int incomingCount; + public OpProfileDef(int operatorId, int operatorType, int incomingCount) { + this.operatorId = operatorId; + this.operatorType = operatorType; + this.incomingCount = incomingCount; + } public int getOperatorId(){ return operatorId; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index 116b616..d62ea2f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -36,14 +36,17 @@ public class OperatorContext implements Closeable { this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); this.popConfig = popConfig; - OpProfileDef def = new OpProfileDef(); - def.operatorId = popConfig.getOperatorId(); - def.incomingCount = getChildCount(popConfig); - def.operatorType = popConfig.getOperatorType(); + OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); this.stats = context.getStats().getOperatorStats(def); } - private static int getChildCount(PhysicalOperator popConfig){ + public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) throws OutOfMemoryException { + this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); + this.popConfig = popConfig; + this.stats = stats; + } + + public static int getChildCount(PhysicalOperator popConfig){ Iterator<PhysicalOperator> iter = popConfig.iterator(); int i = 0; while(iter.hasNext()){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index cde1876..4ac8f74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -28,8 +28,8 @@ import com.carrotsearch.hppc.IntLongOpenHashMap; public class OperatorStats { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class); - private final int operatorId; - private final int operatorType; + protected final int operatorId; + protected final int operatorType; private IntLongOpenHashMap longMetrics = new IntLongOpenHashMap(); private IntDoubleOpenHashMap doubleMetrics = new IntDoubleOpenHashMap(); @@ -42,8 +42,8 @@ public class OperatorStats { private boolean inProcessing = false; private boolean inSetup = false; - private long processingNanos; - private long setupNanos; + protected long processingNanos; + protected long setupNanos; private long processingMark; private long setupMark; @@ -105,23 +105,37 @@ public class OperatorStats { .setSetupNanos(setupNanos) // .setProcessNanos(processingNanos); + addAllMetrics(b); + + return b.build(); + } + + public void addAllMetrics(OperatorProfile.Builder builder) { + addStreamProfile(builder); + addLongMetrics(builder); + addDoubleMetrics(builder); + } + + public void addStreamProfile(OperatorProfile.Builder builder) { for(int i = 0; i < recordsReceivedByInput.length; i++){ - b.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i])); + builder.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i])); } + } + public void addLongMetrics(OperatorProfile.Builder builder) { for(int i =0; i < longMetrics.allocated.length; i++){ if(longMetrics.allocated[i]){ - b.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i])); + builder.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i])); } } + } + public void addDoubleMetrics(OperatorProfile.Builder builder) { for(int i =0; i < doubleMetrics.allocated.length; i++){ if(doubleMetrics.allocated[i]){ - b.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i])); + builder.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i])); } } - - return b.build(); } public void addLongStat(MetricDef metric, long value){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java new file mode 100644 index 0000000..c766632 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderStats; +import org.apache.drill.exec.proto.UserBitShared; + +import java.util.List; + +public class SenderStats extends OperatorStats { + + long minReceiverRecordCount = 0; + long maxReceiverRecordCount = 0; + int nSenders = 0; + + public SenderStats(PhysicalOperator operator) { + super(new OpProfileDef(operator.getOperatorId(), operator.getOperatorType(), OperatorContext.getChildCount(operator))); + } + + public void updatePartitionStats(List<? extends PartitionStatsBatch> outgoing) { + + for (PartitionStatsBatch o : outgoing) { + long totalRecords = o.getTotalRecords(); + + minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); + maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); + } + nSenders = outgoing.size(); + } + + @Override + public UserBitShared.OperatorProfile getProfile() { + final UserBitShared.OperatorProfile.Builder b = UserBitShared.OperatorProfile // + .newBuilder() // + .setOperatorType(operatorType) // + .setOperatorId(operatorId) // + .setSetupNanos(setupNanos) // + .setProcessNanos(processingNanos); + + addAllMetrics(b); + + return b.build(); + + } + + public void addAllMetrics(UserBitShared.OperatorProfile.Builder b) { + super.addAllMetrics(b); + + b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(minReceiverRecordCount). + setMetricId(PartitionSenderStats.MIN_RECORDS.metricId())); + b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(maxReceiverRecordCount). + setMetricId(PartitionSenderStats.MAX_RECORDS.metricId())); + b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(nSenders) + .setMetricId(PartitionSenderStats.N_SENDERS.metricId())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 0db8c07..256c106 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -17,25 +17,16 @@ */ package org.apache.drill.exec.physical.impl; -import org.apache.drill.exec.memory.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.record.RecordBatch; -public abstract class BaseRootExec<T extends PhysicalOperator> implements RootExec { +public abstract class BaseRootExec implements RootExec { - protected final OperatorStats stats; - protected final OperatorContext oContext; - - public BaseRootExec(FragmentContext context, T operator) throws OutOfMemoryException { - oContext = new OperatorContext(operator, context); - stats = oContext.getStats(); - } + protected OperatorStats stats = null; @Override public final boolean next() { + // Stats should have been initialized + assert stats != null; try { stats.startProcessing(); return innerNext(); @@ -44,5 +35,9 @@ public abstract class BaseRootExec<T extends PhysicalOperator> implements RootEx } } + public void setStats(OperatorStats stats) { + this.stats = stats; + } + public abstract boolean innerNext(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 5476a50..9be45d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -32,11 +32,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.SenderStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionerTemplate.OutgoingRecordBatch; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.*; @@ -48,7 +50,6 @@ import com.sun.codemodel.JExpression; import com.sun.codemodel.JType; import org.apache.drill.exec.vector.CopyUtil; - public class PartitionSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); @@ -56,24 +57,28 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private Partitioner partitioner; private FragmentContext context; + private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; - + private final SenderStats stats; public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); + this.stats = new SenderStats(operator); + context.getStats().addOperatorStats(this.stats); + setStats(stats); + this.oContext = new OperatorContext(operator, context, stats); } @Override @@ -140,6 +145,7 @@ public class PartitionSenderRootExec extends BaseRootExec { context.fail(e); return false; } + stats.updatePartitionStats(partitioner.getOutgoingBatches()); for (VectorWrapper v : incoming) { v.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java index 4790596..de5967f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java @@ -22,7 +22,10 @@ import org.apache.drill.exec.ops.MetricDef; public enum PartitionSenderStats implements MetricDef { BATCHES_SENT, - RECORDS_SENT; + RECORDS_SENT, + MIN_RECORDS, + MAX_RECORDS, + N_SENDERS; @Override public int metricId() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java new file mode 100644 index 0000000..85ccffb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + + +public interface PartitionStatsBatch { + + public long getTotalRecords(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 6958403..53528ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -25,8 +25,10 @@ import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch; import java.io.IOException; +import java.util.List; public interface Partitioner { @@ -42,6 +44,7 @@ public interface Partitioner { public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; public abstract void initialize(); public abstract void clear(); + public abstract List<? extends PartitionStatsBatch> getOutgoingBatches(); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 510327a..1e6e71b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -66,6 +66,11 @@ public abstract class PartitionerTemplate implements Partitioner { public PartitionerTemplate() throws SchemaChangeException { } + @Override + public List<? extends PartitionStatsBatch> getOutgoingBatches() { + return outgoingBatches; + } + public final void setup(FragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, @@ -189,7 +194,7 @@ public abstract class PartitionerTemplate implements Partitioner { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; public abstract int doEval(@Named("inIndex") int inIndex); - public class OutgoingRecordBatch implements VectorAccessible { + public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible { private final DataTunnel tunnel; private final HashPartitionSender operator; @@ -202,6 +207,7 @@ public abstract class PartitionerTemplate implements Partitioner { private boolean isLast = false; private BatchSchema outSchema; private int recordCount; + private int totalRecords; private OperatorStats stats; private static final int DEFAULT_RECORD_BATCH_SIZE = 20000; private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200; @@ -224,6 +230,7 @@ public abstract class PartitionerTemplate implements Partitioner { protected boolean copy(int inIndex) throws IOException { if (doEval(inIndex, recordCount)) { recordCount++; + totalRecords++; if (recordCount == DEFAULT_RECORD_BATCH_SIZE) { flush(); } @@ -330,6 +337,12 @@ public abstract class PartitionerTemplate implements Partitioner { return recordCount; } + + @Override + public long getTotalRecords() { + return totalRecords; + } + @Override public TypedFieldId getValueVectorId(SchemaPath path) { return vectorContainer.getValueVectorId(path);
