This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ea39d50532a [feature](cloud) enable pipeline in cloud_p0 (#31825)
ea39d50532a is described below
commit ea39d50532aa98a3245c147187cdf269e00c1809
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Mar 7 17:32:11 2024 +0800
[feature](cloud) enable pipeline in cloud_p0 (#31825)
---
be/src/exec/data_sink.cpp | 14 ++++----------
be/src/pipeline/pipeline_fragment_context.cpp | 4 +++-
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 +++-
.../pipeline/cloud_p0/conf/regression-conf-custom.groovy | 4 ++--
.../pipeline/cloud_p0/conf/session_variables.sql | 3 +--
5 files changed, 13 insertions(+), 16 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 62acd908783..79c2b361bb5 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -150,11 +150,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
-
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
- if (config::is_cloud_mode()) {
- return Status::InternalError(
- "memtable on sink node is not supported in cloud
mode");
- }
+
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+ !config::is_cloud_mode()) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc,
output_exprs));
@@ -295,11 +292,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
-
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
- if (config::is_cloud_mode()) {
- return Status::InternalError(
- "memtable on sink node is not supported in cloud
mode");
- }
+
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+ !config::is_cloud_mode()) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc,
output_exprs));
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b4c5646402e..4c61978f70a 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -35,6 +35,7 @@
#include <typeinfo>
#include <utility>
+#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -820,7 +821,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
case TDataSinkType::OLAP_TABLE_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
-
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
+
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+ !config::is_cloud_mode()) {
sink_ =
std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(),
_sink.get());
} else {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index daae5feecfe..65c533ff7e3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -32,6 +32,7 @@
#include <ostream>
#include <utility>
+#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "exec/data_sink.h"
@@ -358,7 +359,8 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node &&
-
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
+
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink) &&
+ !config::is_cloud_mode()) {
_sink.reset(new OlapTableSinkV2OperatorX(pool,
next_sink_operator_id(), row_desc,
output_exprs));
} else {
diff --git
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 97a2f1515aa..469b0f490ff 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -17,5 +17,5 @@
testGroups = "p0"
//exclude groups and exclude suites is more prior than include groups and
include suites.
-excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
-excludeDirectories =
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv"
+excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel,test_stream_load_new_move_memtable,test_stream_load_move_memtable,test_materialized_view_move_memtable,test_disable_move_memtable,test_insert_move_memtable,set_and_unset_variable,test_pk_uk_case_cluster,test_point_query_cluster_key,test_compaction_uniq_cluster_keys_with_delete,test_compact
[...]
+excludeDirectories =
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,unique_with_mow_p0/ssb_unique_load_zstd_c,nereids_rules_p0/mv"
diff --git a/regression-test/pipeline/cloud_p0/conf/session_variables.sql
b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
index 962464502ec..4cad626a31d 100644
--- a/regression-test/pipeline/cloud_p0/conf/session_variables.sql
+++ b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
@@ -2,5 +2,4 @@
set global insert_visible_timeout_ms=60000;
set global enable_auto_analyze=false;
set global enable_audit_plugin=true;
-set global enable_memtable_on_sink_node=false;
-set global enable_pipeline_x_engine=false;
+set global enable_memtable_on_sink_node=false;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]