This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4f98146e83 [enhancement](tracing) Support forward to master tracing
(#12290)
4f98146e83 is described below
commit 4f98146e83e55f554604c3a99efc27f816c7130b
Author: abmdocrt <[email protected]>
AuthorDate: Sun Sep 18 17:39:04 2022 +0800
[enhancement](tracing) Support forward to master tracing (#12290)
---
.../java/org/apache/doris/qe/ConnectProcessor.java | 51 ++++++++++++++++++++--
.../java/org/apache/doris/qe/MasterOpExecutor.java | 26 ++++++++++-
.../java/org/apache/doris/qe/StmtExecutor.java | 46 ++++++++++---------
gensrc/thrift/FrontendService.thrift | 1 +
4 files changed, 95 insertions(+), 29 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 7920ae6dae..3db4b9e036 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.datasource.CatalogIf;
@@ -62,8 +63,10 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapGetter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -73,7 +76,9 @@ import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
/**
@@ -81,10 +86,23 @@ import java.util.UUID;
*/
public class ConnectProcessor {
private static final Logger LOG =
LogManager.getLogger(ConnectProcessor.class);
+ private static final TextMapGetter<Map<String, String>> getter =
+ new TextMapGetter<Map<String, String>>() {
+ @Override
+ public Iterable<String> keys(Map<String, String> carrier) {
+ return carrier.keySet();
+ }
+ @Override
+ public String get(Map<String, String> carrier, String key) {
+ if (carrier.containsKey(key)) {
+ return carrier.get(key);
+ }
+ return "";
+ }
+ };
private final ConnectContext ctx;
private ByteBuffer packetBuf;
-
private StmtExecutor executor = null;
public ConnectProcessor(ConnectContext context) {
@@ -473,8 +491,8 @@ public class ConnectProcessor {
// explain query stmt do not have profile
if (executor != null && !executor.getParsedStmt().isExplain()
&& (executor.getParsedStmt() instanceof QueryStmt // currently
only QueryStmt and insert need profile
- || executor.getParsedStmt() instanceof LogicalPlanAdapter
- || executor.getParsedStmt() instanceof InsertStmt)) {
+ || executor.getParsedStmt() instanceof LogicalPlanAdapter
+ || executor.getParsedStmt() instanceof InsertStmt)) {
executor.writeProfile(true);
}
}
@@ -543,6 +561,21 @@ public class ConnectProcessor {
}
}
+ Map<String, String> traceCarrier = new HashMap<>();
+ if (request.isSetTraceCarrier()) {
+ traceCarrier = request.getTraceCarrier();
+ }
+ Context extractedContext =
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
+ .extract(Context.current(), traceCarrier, getter);
+ // What we want is for the Traceid to remain unchanged during
propagation.
+ // ctx.initTracer() will be called only if the Context is valid,
+ // so that the Traceid generated by SDKTracer is the same as the
follower. Otherwise,
+ // if the Context is invalid and ctx.initTracer() is called,
+ // SDKTracer will generate a different Traceid.
+ if (Span.fromContext(extractedContext).getSpanContext().isValid()) {
+ ctx.initTracer("master trace");
+ }
+
ctx.setThreadLocalInfo();
StmtExecutor executor = null;
try {
@@ -557,7 +590,17 @@ public class ConnectProcessor {
UUID uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
}
- executor.execute(queryId);
+ Span masterQuerySpan =
+ ctx.getTracer().spanBuilder("master
execute").setParent(extractedContext)
+ .setSpanKind(SpanKind.SERVER).startSpan();
+ try (Scope scope = masterQuerySpan.makeCurrent()) {
+ executor.execute(queryId);
+ } catch (Exception e) {
+ masterQuerySpan.recordException(e);
+ throw e;
+ } finally {
+ masterQuerySpan.end();
+ }
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 896e7c1066..977843a01c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -19,17 +19,23 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.transport.TTransportException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
public class MasterOpExecutor {
private static final Logger LOG =
LogManager.getLogger(MasterOpExecutor.class);
@@ -58,7 +64,17 @@ public class MasterOpExecutor {
}
public void execute() throws Exception {
- forward();
+ Span forwardSpan =
+
ctx.getTracer().spanBuilder("forward").setParent(Context.current())
+ .startSpan();
+ try (Scope scope = forwardSpan.makeCurrent()) {
+ forward();
+ } catch (Exception e) {
+ forwardSpan.recordException(e);
+ throw e;
+ } finally {
+ forwardSpan.end();
+ }
LOG.info("forwarding to master get result max journal id: {}",
result.maxJournalId);
ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId,
waitTimeoutMs);
}
@@ -95,6 +111,14 @@ public class MasterOpExecutor {
// session variables
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
+ // create a trace carrier
+ Map<String, String> traceCarrier = new HashMap<String, String>();
+ // Inject the request with the current context
+ Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
+ .inject(Context.current(), traceCarrier, (carrier, key, value)
-> carrier.put(key, value));
+ // carrier send tracing to master
+ params.setTraceCarrier(traceCarrier);
+
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 93779ad212..c514beee81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -160,7 +160,8 @@ public class StmtExecutor implements ProfileWriter {
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
private static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
-
+ private static final String NULL_VALUE_FOR_LOAD = "\\N";
+ private final Object writeProfileLock = new Object();
private ConnectContext context;
private StatementContext statementContext;
private MysqlSerializer serializer;
@@ -170,7 +171,6 @@ public class StmtExecutor implements ProfileWriter {
private RuntimeProfile profile;
private RuntimeProfile summaryProfile;
private RuntimeProfile plannerRuntimeProfile;
- private final Object writeProfileLock = new Object();
private volatile boolean isFinishedProfile = false;
private String queryType = "Query";
private volatile Coordinator coord = null;
@@ -181,7 +181,6 @@ public class StmtExecutor implements ProfileWriter {
private ShowResultSet proxyResultSet = null;
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
-
private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
// this constructor is mainly for proxy
@@ -209,6 +208,21 @@ public class StmtExecutor implements ProfileWriter {
this.statementContext.setParsedStatement(parsedStmt);
}
+ public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
+ if (cols.size() == 0) {
+ return null;
+ }
+ InternalService.PDataRow.Builder row =
InternalService.PDataRow.newBuilder();
+ for (Expr expr : cols) {
+ if (expr instanceof NullLiteral) {
+ row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
+ } else {
+ row.addColBuilder().setValue(expr.getStringValue());
+ }
+ }
+ return row.build();
+ }
+
public void setCoord(Coordinator coord) {
this.coord = coord;
}
@@ -336,7 +350,7 @@ public class StmtExecutor implements ProfileWriter {
/**
* Used for audit in ConnectProcessor.
- *
+ * <p>
* TODO: There are three interface in StatementBase be called when doing
audit:
* toDigest needAuditEncryption when parsedStmt is not a query
* and isValuesOrConstantSelect when parsedStmt is instance of
InsertStmt.
@@ -346,7 +360,7 @@ public class StmtExecutor implements ProfileWriter {
* isValuesOrConstantSelect: when this interface return true,
original string is truncated at 1024
*
* @return parsed and analyzed statement for Stale planner.
- * an unresolved LogicalPlan wrapped with a LogicalPlanAdapter
for Nereids.
+ * an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
*/
public StatementBase getParsedStmt() {
return parsedStmt;
@@ -569,6 +583,7 @@ public class StmtExecutor implements ProfileWriter {
/**
* get variables in stmt.
* TODO: only support select stmt now. need to support Nereids.
+ *
* @throws DdlException
*/
private void analyzeVariablesInStmt() throws DdlException {
@@ -897,7 +912,7 @@ public class StmtExecutor implements ProfileWriter {
// the meta fields must be sent right before the first batch of data(or
eos flag).
// so if it has data(or eos is true), this method must return true.
private boolean sendCachedValues(MysqlChannel channel,
List<InternalService.PCacheValue> cacheValues,
- SelectStmt selectStmt, boolean
isSendFields, boolean isEos)
+ SelectStmt selectStmt, boolean isSendFields, boolean isEos)
throws Exception {
RowBatch batch = null;
boolean isSend = isSendFields;
@@ -1123,7 +1138,7 @@ public class StmtExecutor implements ProfileWriter {
plannerProfile.setQueryFetchResultFinishTime();
} catch (Exception e) {
fetchResultSpan.recordException(e);
- throw e;
+ throw e;
} finally {
fetchResultSpan.end();
}
@@ -1331,23 +1346,6 @@ public class StmtExecutor implements ProfileWriter {
executor.beginTransaction(request);
}
- private static final String NULL_VALUE_FOR_LOAD = "\\N";
-
- public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
- if (cols.size() == 0) {
- return null;
- }
- InternalService.PDataRow.Builder row =
InternalService.PDataRow.newBuilder();
- for (Expr expr : cols) {
- if (expr instanceof NullLiteral) {
- row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
- } else {
- row.addColBuilder().setValue(expr.getStringValue());
- }
- }
- return row.build();
- }
-
// Process a select statement.
private void handleInsertStmt() throws Exception {
// Every time set no send flag and clean all data in buffer
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index ebb479140a..d126bf4dc5 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -435,6 +435,7 @@ struct TMasterOpRequest {
18: optional i64 insert_visible_timeout_ms // deprecated, move into
session_variables
19: optional map<string, string> session_variables
20: optional bool foldConstantByBe
+ 21: optional map<string, string> trace_carrier
}
struct TColumnDefinition {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]