gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1565423170
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -19,107 +19,295 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.io.DataInput;
+import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class MultiStageOperator implements
Operator<TransferableBlock>, AutoCloseable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MultiStageOperator.class);
+public abstract class MultiStageOperator<K extends Enum<K> & StatMap.Key>
+ implements Operator<TransferableBlock>, AutoCloseable {
protected final OpChainExecutionContext _context;
protected final String _operatorId;
- protected final OpChainStats _opChainStats;
+ protected final StatMap<K> _statMap;
protected boolean _isEarlyTerminated;
- public MultiStageOperator(OpChainExecutionContext context) {
+ public MultiStageOperator(OpChainExecutionContext context, Class<K>
keyStatClass) {
_context = context;
_operatorId = Joiner.on("_").join(getClass().getSimpleName(),
_context.getStageId(), _context.getServer());
- _opChainStats = _context.getStats();
_isEarlyTerminated = false;
+ _statMap = new StatMap<>(keyStatClass);
}
+ protected abstract Logger logger();
+
+ public abstract Type getOperatorType();
+
+ public abstract K getExecutionTimeKey();
+
+ public abstract K getEmittedRowsKey();
+
@Override
public TransferableBlock nextBlock() {
if (Tracing.ThreadAccountantOps.isInterrupted()) {
throw new EarlyTerminationException("Interrupted while processing next
block");
}
+ if (logger().isDebugEnabled()) {
+ logger().debug("Operator {}: Reading next block", _operatorId);
+ }
try (InvocationScope ignored =
Tracing.getTracer().createScope(getClass())) {
TransferableBlock nextBlock;
if (shouldCollectStats()) {
- OperatorStats operatorStats = _opChainStats.getOperatorStats(_context,
_operatorId);
- operatorStats.startTimer();
+ Stopwatch executeStopwatch = Stopwatch.createStarted();
Review Comment:
We discussed on whether we want to always collect time stats or not. Right
now this PR is not modifying that, but it should be as easy as to always return
true in `shouldCollectStats` or just remove this if
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]