gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1580748812
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -19,107 +19,295 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.io.DataInput;
+import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class MultiStageOperator implements
Operator<TransferableBlock>, AutoCloseable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MultiStageOperator.class);
+public abstract class MultiStageOperator<K extends Enum<K> & StatMap.Key>
+ implements Operator<TransferableBlock>, AutoCloseable {
protected final OpChainExecutionContext _context;
protected final String _operatorId;
- protected final OpChainStats _opChainStats;
+ protected final StatMap<K> _statMap;
protected boolean _isEarlyTerminated;
- public MultiStageOperator(OpChainExecutionContext context) {
+ public MultiStageOperator(OpChainExecutionContext context, Class<K>
keyStatClass) {
_context = context;
_operatorId = Joiner.on("_").join(getClass().getSimpleName(),
_context.getStageId(), _context.getServer());
- _opChainStats = _context.getStats();
_isEarlyTerminated = false;
+ _statMap = new StatMap<>(keyStatClass);
}
+ protected abstract Logger logger();
+
+ public abstract Type getOperatorType();
+
+ public abstract K getExecutionTimeKey();
+
+ public abstract K getEmittedRowsKey();
+
@Override
public TransferableBlock nextBlock() {
if (Tracing.ThreadAccountantOps.isInterrupted()) {
throw new EarlyTerminationException("Interrupted while processing next
block");
}
+ if (logger().isDebugEnabled()) {
+ logger().debug("Operator {}: Reading next block", _operatorId);
+ }
try (InvocationScope ignored =
Tracing.getTracer().createScope(getClass())) {
TransferableBlock nextBlock;
if (shouldCollectStats()) {
- OperatorStats operatorStats = _opChainStats.getOperatorStats(_context,
_operatorId);
- operatorStats.startTimer();
+ Stopwatch executeStopwatch = Stopwatch.createStarted();
Review Comment:
I've finally decided to always collect stats because this was difficult to
explain and the cost is negligible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]