github-actions[bot] commented on code in PR #24793:
URL: https://github.com/apache/doris/pull/24793#discussion_r1334227917


##########
be/src/pipeline/exec/result_file_sink_operator.cpp:
##########
@@ -38,4 +43,227 @@ OperatorPtr ResultFileSinkOperatorBuilder::build_operator() 
{
 ResultFileSinkOperator::ResultFileSinkOperator(OperatorBuilderBase* 
operator_builder,
                                                DataSink* sink)
         : DataSinkOperator(operator_builder, sink) {};
-} // namespace doris::pipeline
\ No newline at end of file
+
+ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* 
parent,
+                                                   RuntimeState* state)
+        : AsyncWriterSink<vectorized::VFileResultWriter>(
+                  parent, state, 
parent->cast<ResultFileSinkOperatorX>()._row_desc,
+                  parent->cast<ResultFileSinkOperatorX>()._t_output_expr),
+          _serializer(this) {}
+
+ResultFileSinkOperatorX::ResultFileSinkOperatorX(const RowDescriptor& row_desc,
+                                                 const std::vector<TExpr>& 
t_output_expr)
+        : DataSinkOperatorX(0),
+          _row_desc(row_desc),
+          _t_output_expr(t_output_expr),
+          _is_top_sink(true) {}
+
+ResultFileSinkOperatorX::ResultFileSinkOperatorX(
+        RuntimeState* state, ObjectPool* pool, int sender_id, const 
RowDescriptor& row_desc,
+        const TResultFileSink& sink, const 
std::vector<TPlanFragmentDestination>& destinations,
+        bool send_query_statistics_with_every_batch, const std::vector<TExpr>& 
t_output_expr,
+        DescriptorTbl& descs)
+        : DataSinkOperatorX(0),
+          _row_desc(row_desc),
+          _t_output_expr(t_output_expr),
+          _dests(destinations),
+          
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
+          
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false),
+          _is_top_sink(false) {
+    CHECK_EQ(destinations.size(), 1);
+}
+
+Status ResultFileSinkOperatorX::init(const TDataSink& tsink) {
+    auto& sink = tsink.result_file_sink;
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new vectorized::ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+
+    //for impl csv_with_name and csv_with_names_and_types
+    _header_type = sink.header_type;
+    _header = sink.header;
+
+    return Status::OK();
+}
+
+Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_open_timer);
+    _sender_id = info.sender_id;
+
+    _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
+    auto& p = _parent->cast<ResultFileSinkOperatorX>();
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
+    CHECK(p._file_opts.get() != nullptr);
+    if (p._is_top_sink) {
+        // create sender
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+                state->fragment_instance_id(), p._buf_size, &_sender, 
state->enable_pipeline_exec(),
+                state->execution_timeout()));
+        // create writer
+        _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
+                p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
+                _output_vexpr_ctxs, _sender.get(), nullptr, 
state->return_object_data_as_binary(),
+                p._output_row_descriptor));
+    } else {
+        // init channel
+        _output_block = vectorized::Block::create_unique(
+                p._output_row_descriptor.tuple_descriptors()[0]->slots(), 1);
+        _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
+                p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
+                _output_vexpr_ctxs, nullptr, _output_block.get(),
+                state->return_object_data_as_binary(), 
p._output_row_descriptor));
+
+        std::map<int64_t, int64_t> fragment_id_to_channel_index;
+        for (int i = 0; i < p._dests.size(); ++i) {
+            _channels.push_back(new vectorized::Channel(
+                    this, p._row_desc, p._dests[i].brpc_server, 
state->fragment_instance_id(),
+                    info.tsink.result_file_sink.dest_node_id, false,
+                    p._send_query_statistics_with_every_batch));
+        }
+        std::random_device rd;
+        std::mt19937 g(rd());
+        shuffle(_channels.begin(), _channels.end(), g);
+
+        int local_size = 0;
+        for (int i = 0; i < _channels.size(); ++i) {
+            RETURN_IF_ERROR(_channels[i]->init(state));
+            if (_channels[i]->is_local()) {
+                local_size++;
+            }
+        }
+        _only_local_exchange = local_size == _channels.size();
+    }
+    _writer->set_header_info(p._header_type, p._header);
+    return Status::OK();
+}
+
+Status ResultFileSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_open_timer);
+    return Base::open(state);
+}
+
+Status ResultFileSinkLocalState::close(RuntimeState* state, Status 
exec_status) {

Review Comment:
   warning: method 'close' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   Status ResultFileSinkLocalState::close(RuntimeState* state, Status 
exec_status) const {
   ```
   
   be/src/pipeline/exec/result_file_sink_operator.h:55:
   ```diff
   -     Status close(RuntimeState* state, Status exec_status) override;
   +     Status close(RuntimeState* state, Status exec_status) const override;
   ```
   



##########
be/src/pipeline/exec/result_file_sink_operator.h:
##########
@@ -42,5 +43,77 @@ class ResultFileSinkOperator final : public 
DataSinkOperator<ResultFileSinkOpera
     bool can_write() override { return true; }
 };
 
+class ResultFileSinkOperatorX;
+class ResultSinkLocalState;
+class ResultFileSinkLocalState final : public 
AsyncWriterSink<vectorized::VFileResultWriter> {
+public:
+    using Base = AsyncWriterSink<vectorized::VFileResultWriter>;
+    ENABLE_FACTORY_CREATOR(ResultFileSinkLocalState);
+    ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
+
+    int sender_id() { return _sender_id; }

Review Comment:
   warning: method 'sender_id' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       int sender_id() const { return _sender_id; }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to