This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new a2e342839a [feature](profile) backport stream load profile into
1.2-lts (#21784)
a2e342839a is described below
commit a2e342839a17d315e72788377604e29e5146a17d
Author: Kaijie Chen <[email protected]>
AuthorDate: Mon Jul 17 16:15:40 2023 +0800
[feature](profile) backport stream load profile into 1.2-lts (#21784)
Backport stream load profile into 1.2-lts, see #18364.
The commented fields in proto is reserved for future backports.
---
be/src/http/action/stream_load.cpp | 7 +++++++
be/src/http/http_common.h | 1 +
be/src/runtime/fragment_mgr.cpp | 1 +
.../main/java/org/apache/doris/planner/StreamLoadPlanner.java | 1 +
.../src/main/java/org/apache/doris/qe/QeProcessorImpl.java | 11 +++++++++--
.../src/main/java/org/apache/doris/task/LoadTaskInfo.java | 4 ++++
.../src/main/java/org/apache/doris/task/StreamLoadTask.java | 9 +++++++++
gensrc/thrift/FrontendService.thrift | 6 ++++++
8 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 5f036374c2..b9f2700968 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -572,6 +572,13 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req, StreamLoadContext*
request.__set_trim_double_quotes(false);
}
}
+ if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
+ if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
+ request.__set_enable_profile(true);
+ } else {
+ request.__set_enable_profile(false);
+ }
+ }
#ifndef BE_TEST
// plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index e61c828fb4..9c9dbf9296 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -52,6 +52,7 @@ static const std::string HTTP_SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2b069d78a2..3bed90a59e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -327,6 +327,7 @@ void FragmentExecState::coordinator_callback(const Status&
status, RuntimeProfil
RuntimeState* runtime_state = _executor.runtime_state();
DCHECK(runtime_state != nullptr);
+ params.__set_query_type(runtime_state->query_type());
if (runtime_state->query_type() == TQueryType::LOAD && !done &&
status.ok()) {
// this is a load plan, and load is not finished, just make a brief
report
params.__set_loaded_rows(runtime_state->num_rows_load_total());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 634379ed7d..c61fd96976 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -259,6 +259,7 @@ public class StreamLoadPlanner {
queryOptions.setLoadMemLimit(taskInfo.getMemLimit());
queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
+ queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 1bf11ece97..7e79fc47ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TStatus;
@@ -188,8 +189,14 @@ public final class QeProcessorImpl implements QeProcessor {
final TReportExecStatusResult result = new TReportExecStatusResult();
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
- result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
- LOG.info("ReportExecStatus() runtime error, query {} does not
exist", DebugUtil.printId(params.query_id));
+ // There is no QueryInfo for StreamLoad, so we return OK
+ if (params.query_type == TQueryType.LOAD) {
+ result.setStatus(new TStatus(TStatusCode.OK));
+ } else {
+ result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
+ }
+ LOG.info("ReportExecStatus() runtime error, query {} with type {}
does not exist",
+ DebugUtil.printId(params.query_id), params.query_type);
return result;
}
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index e384394653..47d18ef205 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -101,6 +101,10 @@ public interface LoadTaskInfo {
return false;
}
+ default boolean getEnableProfile() {
+ return false;
+ }
+
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 2a80ceef90..cae47eca28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -83,6 +83,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private String headerType = "";
private List<String> hiddenColumns;
private boolean trimDoubleQuotes = false;
+ private boolean enableProfile = false;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType,
TFileFormatType formatType,
TFileCompressType compressType) {
@@ -257,6 +258,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return trimDoubleQuotes;
}
+ @Override
+ public boolean getEnableProfile() {
+ return enableProfile;
+ }
+
public static StreamLoadTask
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new
StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@@ -359,6 +365,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetTrimDoubleQuotes()) {
trimDoubleQuotes = request.isTrimDoubleQuotes();
}
+ if (request.isSetEnableProfile()) {
+ enableProfile = request.isEnableProfile();
+ }
}
// used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 32c73a5663..3634a11333 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -407,6 +407,10 @@ struct TReportExecStatusParams {
17: optional i64 loaded_bytes
18: optional list<Types.TErrorTabletInfo> errorTabletInfos
+
+ // 19: optional i32 fragment_id
+
+ 20: optional PaloInternalService.TQueryType query_type
}
struct TFeResult {
@@ -547,6 +551,8 @@ struct TStreamLoadPutRequest {
40: optional PlanNodes.TFileCompressType compress_type
41: optional i64 file_size // only for stream load with parquet or orc
42: optional bool trim_double_quotes // trim double quotes for csv
+ // 43: optional i32 skip_lines // csv skip line num, only used when csv
header_type is not set.
+ 44: optional bool enable_profile
}
struct TStreamLoadPutResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]