This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a32c2ee0 [pipelineX](fix) fix two phase execution and add test cases 
(#23353)
49a32c2ee0 is described below

commit 49a32c2ee0015b11eb8aa7bcf4fa73ab33035eba
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 25 17:57:35 2023 +0800

    [pipelineX](fix) fix two phase execution and add test cases (#23353)
---
 .../pipeline/exec/aggregation_source_operator.cpp  |  3 +--
 be/src/pipeline/exec/exchange_source_operator.cpp  |  6 +----
 be/src/pipeline/exec/exchange_source_operator.h    | 10 +++++++++
 be/src/pipeline/exec/operator.h                    |  7 ++++++
 be/src/pipeline/exec/result_sink_operator.cpp      | 16 +++++--------
 be/src/pipeline/exec/result_sink_operator.h        |  5 +++--
 be/src/pipeline/pipeline_task.h                    |  2 ++
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 26 +++++++++++++++++++---
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 12 ++++++----
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  8 ++++++-
 be/src/pipeline/task_scheduler.cpp                 |  3 ++-
 .../suites/ssb_sf0.1_p1/sql/flat_q1.1.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q1.2.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q1.3.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q2.1.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q2.2.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q2.3.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q3.1.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q3.2.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q3.3.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q3.4.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q4.1.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q4.2.sql          |  2 +-
 .../suites/ssb_sf0.1_p1/sql/flat_q4.3.sql          |  2 +-
 24 files changed, 82 insertions(+), 42 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 2f91a633f2..cbee5a832f 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -557,8 +557,7 @@ Status AggSourceOperatorX::setup_local_state(RuntimeState* 
state, LocalStateInfo
 }
 
 bool AggSourceOperatorX::can_read(RuntimeState* state) {
-    auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
-    return local_state._dependency->done();
+    return 
state->get_local_state(id())->cast<AggLocalState>()._dependency->done();
 }
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index c593e5ab96..dadf70f0de 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -48,11 +48,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         return Status::OK();
     }
     RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
-    auto& parent_ref = _parent->cast<ExchangeSourceOperatorX>();
-    stream_recvr = _state->exec_env()->vstream_mgr()->create_recvr(
-            _state, parent_ref._input_row_desc, 
_state->fragment_instance_id(), parent_ref._id,
-            parent_ref._num_senders, profile(), parent_ref._is_merging,
-            parent_ref._sub_plan_query_statistics_recvr);
+    stream_recvr = info.recvr;
     
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
             state, vsort_exec_exprs));
     _init = true;
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index d717970a97..d599d3d06d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -80,6 +80,16 @@ public:
 
     Status close(RuntimeState* state) override;
     bool is_source() const override { return true; }
+    bool need_to_create_exch_recv() const override { return true; }
+
+    RowDescriptor input_row_desc() const { return _input_row_desc; }
+
+    int num_senders() const { return _num_senders; }
+    bool is_merging() const { return _is_merging; }
+
+    std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr() {
+        return _sub_plan_query_statistics_recvr;
+    }
 
 private:
     friend class ExchangeLocalState;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index dbc323cc67..2fb484afc0 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -36,6 +36,8 @@
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
+#include "vec/runtime/vdata_stream_recvr.h"
+#include "vec/sink/vresult_sink.h"
 
 namespace doris {
 class DataSink;
@@ -484,12 +486,14 @@ protected:
 struct LocalStateInfo {
     const std::vector<TScanRangeParams> scan_ranges;
     Dependency* dependency;
+    std::shared_ptr<vectorized::VDataStreamRecvr> recvr;
 };
 
 // This struct is used only for initializing local sink state.
 struct LocalSinkStateInfo {
     const int sender_id;
     Dependency* dependency;
+    std::shared_ptr<BufferControlBlock> sender;
 };
 
 class PipelineXLocalState {
@@ -674,6 +678,7 @@ public:
     }
 
     virtual bool is_source() const override { return false; }
+    [[nodiscard]] virtual bool need_to_create_exch_recv() const { return 
false; }
 
     Status get_next_after_projects(RuntimeState* state, vectorized::Block* 
block,
                                    SourceState& source_state);
@@ -768,6 +773,8 @@ public:
 
     virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& 
info) = 0;
 
+    [[nodiscard]] virtual bool need_to_create_result_sender() const { return 
false; }
+
     template <class TARGET>
     TARGET& cast() {
         DCHECK(dynamic_cast<TARGET*>(this));
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index df917c26af..ce8a933d7e 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -58,9 +58,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     // create profile
     _profile = state->obj_pool()->add(new RuntimeProfile(title));
     // create sender
-    
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(),
-                                                                   
p._buf_size, &_sender, true,
-                                                                   
state->execution_timeout()));
+    _sender = info.sender;
     _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
@@ -81,11 +79,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
 
 ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc,
                                          const std::vector<TExpr>& 
t_output_expr,
-                                         const TResultSink& sink, int 
buffer_size)
-        : DataSinkOperatorX(0),
-          _row_desc(row_desc),
-          _t_output_expr(t_output_expr),
-          _buf_size(buffer_size) {
+                                         const TResultSink& sink)
+        : DataSinkOperatorX(0), _row_desc(row_desc), 
_t_output_expr(t_output_expr) {
     if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) {
         _sink_type = TResultSinkType::MYSQL_PROTOCAL;
     } else {
@@ -185,7 +180,6 @@ Status ResultSinkLocalState::close(RuntimeState* state) {
 }
 
 bool ResultSinkOperatorX::can_write(RuntimeState* state) {
-    auto& local_state = 
state->get_sink_local_state(id())->cast<ResultSinkLocalState>();
-    return local_state._sender->can_sink();
+    return 
state->get_sink_local_state(id())->cast<ResultSinkLocalState>()._sender->can_sink();
 }
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index e2e2e517f8..e98bae86e7 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -65,7 +65,7 @@ private:
 class ResultSinkOperatorX final : public DataSinkOperatorX {
 public:
     ResultSinkOperatorX(const RowDescriptor& row_desc, const 
std::vector<TExpr>& select_exprs,
-                        const TResultSink& sink, int buffer_size);
+                        const TResultSink& sink);
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
@@ -75,6 +75,8 @@ public:
 
     bool can_write(RuntimeState* state) override;
 
+    [[nodiscard]] bool need_to_create_result_sender() const override { return 
true; }
+
 private:
     friend class ResultSinkLocalState;
 
@@ -89,7 +91,6 @@ private:
     // Owned by the RuntimeState.
     const std::vector<TExpr>& _t_output_expr;
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
-    int _buf_size; // Allocated from _pool
 
     // for fetch data by rowids
     TFetchOption _fetch_option;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 57d7659197..fc66ca54f1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -246,6 +246,8 @@ public:
         }
     }
 
+    TUniqueId instance_id() const { return _state->fragment_instance_id(); }
+
 protected:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7e97696a62..1a569202fb 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -26,6 +26,7 @@
 #include <opentelemetry/trace/span_context.h>
 #include <opentelemetry/trace/tracer.h>
 #include <pthread.h>
+#include <runtime/result_buffer_mgr.h>
 #include <stdlib.h>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
@@ -252,8 +253,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         }
 
         // TODO: figure out good buffer size based on size of output row
-        _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, 
thrift_sink.result_sink,
-                                            
vectorized::RESULT_SINK_BUFFER_SIZE));
+        _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, 
thrift_sink.result_sink));
         break;
     }
     default:
@@ -302,10 +302,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             auto scan_ranges = 
find_with_default(local_params.per_node_scan_ranges,
                                                  
_pipelines[pip_idx]->operator_xs().front()->id(),
                                                  no_scan_ranges);
+            std::shared_ptr<BufferControlBlock> sender = nullptr;
+            if (_pipelines[pip_idx]->sink_x()->need_to_create_result_sender()) 
{
+                // create sender
+                
RETURN_IF_ERROR(_runtime_states[i]->exec_env()->result_mgr()->create_sender(
+                        _runtime_states[i]->fragment_instance_id(),
+                        vectorized::RESULT_SINK_BUFFER_SIZE, &sender, true,
+                        _runtime_states[i]->execution_timeout()));
+            }
+
+            std::shared_ptr<vectorized::VDataStreamRecvr> recvr = nullptr;
+            if 
(_pipelines[pip_idx]->operator_xs().front()->need_to_create_exch_recv()) {
+                auto* src =
+                        
(ExchangeSourceOperatorX*)_pipelines[pip_idx]->operator_xs().front().get();
+                recvr = 
_runtime_states[i]->exec_env()->vstream_mgr()->create_recvr(
+                        _runtime_states[i].get(), src->input_row_desc(),
+                        _runtime_states[i]->fragment_instance_id(), src->id(), 
src->num_senders(),
+                        _runtime_profile.get(), src->is_merging(),
+                        src->sub_plan_query_statistics_recvr());
+            }
 
             auto task = std::make_unique<PipelineXTask>(
                     _pipelines[pip_idx], _total_tasks++, 
_runtime_states[i].get(), this,
-                    _pipelines[pip_idx]->pipeline_profile(), scan_ranges, 
local_params.sender_id);
+                    _pipelines[pip_idx]->pipeline_profile(), scan_ranges, 
local_params.sender_id,
+                    sender, recvr);
             pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), 
task.get()});
             RETURN_IF_ERROR(task->prepare(_runtime_states[i].get()));
             
_runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true, 
nullptr);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 676414fce5..f55c11982e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -45,14 +45,18 @@ namespace doris::pipeline {
 PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, 
RuntimeState* state,
                              PipelineFragmentContext* fragment_context,
                              RuntimeProfile* parent_profile,
-                             const std::vector<TScanRangeParams>& scan_ranges, 
const int sender_id)
+                             const std::vector<TScanRangeParams>& scan_ranges, 
const int sender_id,
+                             std::shared_ptr<BufferControlBlock>& sender,
+                             std::shared_ptr<vectorized::VDataStreamRecvr>& 
recvr)
         : PipelineTask(pipeline, index, state, fragment_context, 
parent_profile),
           _scan_ranges(scan_ranges),
           _operators(pipeline->operator_xs()),
           _source(_operators.front()),
           _root(_operators.back()),
           _sink(pipeline->sink_shared_pointer()),
-          _sender_id(sender_id) {
+          _sender_id(sender_id),
+          _sender(sender),
+          _recvr(recvr) {
     _pipeline_task_watcher.start();
     _sink->get_dependency(_downstream_dependency);
 }
@@ -99,13 +103,13 @@ Status PipelineXTask::_open() {
         Dependency* dep = _upstream_dependency.find(o->id()) == 
_upstream_dependency.end()
                                   ? (Dependency*)nullptr
                                   : 
_upstream_dependency.find(o->id())->second.get();
-        LocalStateInfo info {_scan_ranges, dep};
+        LocalStateInfo info {_scan_ranges, dep, _recvr};
         Status cur_st = o->setup_local_state(_state, info);
         if (!cur_st.ok()) {
             st = cur_st;
         }
     }
-    LocalSinkStateInfo info {_sender_id, _downstream_dependency.get()};
+    LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), 
_sender};
     RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
     RETURN_IF_ERROR(st);
     _opened = true;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 1453b10ba2..864709b4ed 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -31,6 +31,7 @@
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "vec/core/block.h"
+#include "vec/sink/vresult_sink.h"
 
 namespace doris {
 class QueryContext;
@@ -50,7 +51,9 @@ class PipelineXTask : public PipelineTask {
 public:
     PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
                   PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile,
-                  const std::vector<TScanRangeParams>& scan_ranges, const int 
sender_id);
+                  const std::vector<TScanRangeParams>& scan_ranges, const int 
sender_id,
+                  std::shared_ptr<BufferControlBlock>& sender,
+                  std::shared_ptr<vectorized::VDataStreamRecvr>& recvr);
 
     Status prepare(RuntimeState* state) override;
 
@@ -127,5 +130,8 @@ private:
 
     DependencyMap _upstream_dependency;
     DependencySPtr _downstream_dependency;
+
+    std::shared_ptr<BufferControlBlock> _sender;
+    std::shared_ptr<vectorized::VDataStreamRecvr> _recvr;
 };
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index b792f0f4c6..0be333479b 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -123,7 +123,8 @@ void BlockedTaskScheduler::_schedule() {
             } else if (task->query_context()->is_timeout(now)) {
                 LOG(WARNING) << "Timeout, query_id=" << 
print_id(task->query_context()->query_id())
                              << ", instance_id="
-                             << 
print_id(task->fragment_context()->get_fragment_instance_id());
+                             << 
print_id(task->fragment_context()->get_fragment_instance_id())
+                             << ", task info: " << task->debug_string();
 
                 
task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);
                 _make_task_run(local_blocked_tasks, iter, ready_tasks);
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
index eae02823a0..c3d19b67a2 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
 FROM lineorder_flat
 WHERE
     LO_ORDERDATE >= 19930101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
index 3a899c9344..6ab6ceea34 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q1.2
-SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
 FROM lineorder_flat
 WHERE
     LO_ORDERDATE >= 19940101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
index 5aaeff83a7..70796c2a95 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q1.3
-SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
 FROM lineorder_flat
 WHERE
     weekofyear(LO_ORDERDATE) = 6
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
index 254ea6481a..57f2ada296 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q2.1
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
     P_BRAND
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
index 6a636f3a9e..9b7a5db502 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q2.2
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
     P_BRAND
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
index a2ef0c6df3..3a8a5e74d4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q2.3
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
     P_BRAND
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
index 8df98222c4..6b3257f1f3 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.1
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_NATION,
     S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
index c588b5bbce..fefe727da8 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.2
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_CITY,
     S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
index 9a099d1732..c4560b701e 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.3
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_CITY,
     S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
index 6bd71b5891..4ae5d956e4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.4
-SELECT
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_CITY,
     S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
index aedd0e047e..87b29bf160 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q4.1
-SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE 
DIV 10000) AS YEAR,
     C_NATION,
     SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
index b9891ee408..8ea28f3f12 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q4.2
-SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE 
DIV 10000) AS YEAR,
     S_NATION,
     P_CATEGORY,
     SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
index 6871023137..0f7c7401ab 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q4.3
-SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE 
DIV 10000) AS YEAR,
     S_CITY,
     P_BRAND,
     SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to