This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new db17f5fe791 [improve](move-memtbale) enable move memtable in routine
load (#28974)
db17f5fe791 is described below
commit db17f5fe791af648060cdcad673a30ce7283b39c
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sat Jan 6 18:22:01 2024 +0800
[improve](move-memtbale) enable move memtable in routine load (#28974)
---
be/src/io/fs/multi_table_pipe.cpp | 1 +
.../routine_load/routine_load_task_executor.cpp | 3 +++
be/src/runtime/stream_load/stream_load_context.h | 2 ++
.../apache/doris/load/routineload/KafkaTaskInfo.java | 1 +
.../apache/doris/load/routineload/RoutineLoadJob.java | 18 +++++++++++++++++-
gensrc/thrift/BackendService.thrift | 1 +
6 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index a11d6412df2..da46645fd4f 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -162,6 +162,7 @@ Status MultiTablePipe::request_and_exec_plans() {
request.__set_loadId(_ctx->id.to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
+ request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
// no need to register new_load_stream_mgr coz it is already done in
routineload submit task
// plan this load
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d14b2999c14..dc6b855cd5a 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -213,6 +213,9 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
if (task.__isset.is_multi_table && task.is_multi_table) {
ctx->is_multi_table = true;
}
+ if (task.__isset.memtable_on_sink_node) {
+ ctx->memtable_on_sink_node = task.memtable_on_sink_node;
+ }
// set execute plan params (only for non-single-stream-multi-table load)
TStreamLoadPutResult put_result;
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index e57996af9e1..3f1f6b92431 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -232,6 +232,8 @@ public:
// for single-stream-multi-table, we have table list
std::vector<std::string> table_list;
+ bool memtable_on_sink_node = false;
+
public:
ExecEnv* exec_env() { return _exec_env; }
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 56e931993e8..2075e5548e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -108,6 +108,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
} else {
tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN);
}
+
tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode());
return tRoutineLoadTask;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8bea553c792..1ce3c1e8c0c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -202,6 +202,8 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
protected String sequenceCol;
+ protected boolean memtableOnSinkNode = false;
+
/**
* RoutineLoad support json data.
* Require Params:
@@ -268,6 +270,9 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
public RoutineLoadJob(long id, LoadDataSourceType type) {
this.id = id;
this.dataSourceType = type;
+ if (ConnectContext.get() != null) {
+ this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+ }
}
public RoutineLoadJob(Long id, String name,
@@ -283,6 +288,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
+ this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
} else {
sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
@@ -304,6 +310,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
+ this.memtableOnSinkNode =
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
} else {
sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
@@ -686,6 +693,15 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
return !Strings.isNullOrEmpty(sequenceCol);
}
+ @Override
+ public boolean isMemtableOnSinkNode() {
+ return memtableOnSinkNode;
+ }
+
+ public void setMemtableOnSinkNode(boolean memtableOnSinkNode) {
+ this.memtableOnSinkNode = memtableOnSinkNode;
+ }
+
public void setComment(String comment) {
this.comment = comment;
}
@@ -874,11 +890,11 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
}
private void initPlanner() throws UserException {
- Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
// for multi table load job, the table name is dynamic,we will set
table when task scheduling.
if (isMultiTable) {
return;
}
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
planner = new StreamLoadPlanner(db,
(OlapTable) db.getTableOrMetaException(this.tableId,
Table.TableType.OLAP), this);
}
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 2c5b199fc17..dab0b860677 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -67,6 +67,7 @@ struct TRoutineLoadTask {
14: optional PlanNodes.TFileFormatType format
15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
16: optional bool is_multi_table
+ 17: optional bool memtable_on_sink_node;
}
struct TKafkaMetaProxyRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]