yangzhg commented on a change in pull request #3584: URL: https://github.com/apache/incubator-doris/pull/3584#discussion_r426341509
########## File path: be/src/exec/parquet_writer.cpp ########## @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet_writer.h" + +#include <time.h> +#include <arrow/status.h> +#include <arrow/array.h> + +#include "exec/file_writer.h" +#include "common/logging.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "util/thrift_util.h" + +namespace doris { + +/// ParquetOutputStream +ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer): _file_writer(file_writer) { + set_mode(arrow::io::FileMode::WRITE); +} + +ParquetOutputStream::~ParquetOutputStream() { + Close(); +} + +arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { + size_t written_len = 0; + Status st = _file_writer->write(reinterpret_cast<const uint8_t*>(data), nbytes, &written_len); + if (!st.ok()) { Review comment: why not `ARROW_RETURN_NOT_OK(st)` ########## File path: be/src/exec/parquet_writer.cpp ########## @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet_writer.h" + +#include <time.h> +#include <arrow/status.h> +#include <arrow/array.h> + +#include "exec/file_writer.h" +#include "common/logging.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "util/thrift_util.h" + +namespace doris { + +/// ParquetOutputStream +ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer): _file_writer(file_writer) { + set_mode(arrow::io::FileMode::WRITE); +} + +ParquetOutputStream::~ParquetOutputStream() { + Close(); +} + +arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { + size_t written_len = 0; + Status st = _file_writer->write(reinterpret_cast<const uint8_t*>(data), nbytes, &written_len); + if (!st.ok()) { + return arrow::Status::IOError(st.get_error_msg()); + } + _cur_pos += written_len; + return arrow::Status::OK(); +} + +arrow::Status ParquetOutputStream::Tell(int64_t* position) const { + *position = _cur_pos; + return arrow::Status::OK(); +} + +arrow::Status ParquetOutputStream::Close() { + Status st = _file_writer->close(); + if (!st.ok()) { Review comment: why not `ARROW_RETURN_NOT_OK(st)` ########## File path: be/src/runtime/result_sink.cpp ########## @@ -35,6 +36,17 @@ ResultSink::ResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>& : _row_desc(row_desc), _t_output_expr(t_output_expr), _buf_size(buffer_size) { + + if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) { Review comment: || sink.type == TResultSinkType::MYSQL_PROTOCAL is unnecessary ########## File path: docs/zh-CN/administrator-guide/outfile.md ########## @@ -0,0 +1,183 @@ +--- +{ + "title": "查询结果集导出", Review comment: `导出查询结果集` 更好一点 ########## File path: be/src/runtime/file_result_writer.cpp ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/file_result_writer.h" + +#include "exec/broker_writer.h" +#include "exec/local_file_writer.h" +#include "exec/parquet_writer.h" +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/runtime_state.h" +#include "util/types.h" +#include "util/date_func.h" +#include "util/uid_util.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; + +FileResultWriter::FileResultWriter( + const ResultFileOptions* file_opts, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _file_opts(file_opts), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile) { +} + +FileResultWriter::~FileResultWriter() { + close(); +} + +Status FileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + + RETURN_IF_ERROR(_create_file_writer()); + return Status::OK(); +} + +void FileResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("FileResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); + _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); + _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); +} + +Status FileResultWriter::_create_file_writer() { + std::string file_name = _get_next_file_name(); + if (_file_opts->is_local_file) { + _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); + } else { + _file_writer = new BrokerWriter(_state->exec_env(), + _file_opts->broker_addresses, + _file_opts->broker_properties, + file_name, + 0 /*start offset*/); + } + RETURN_IF_ERROR(_file_writer->open()); + + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + case TFileFormatType::FORMAT_PARQUET: + _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs); Review comment: `_file_writer` seems owned by `ParquetWriterWrapper` by not realeased in `ParquetOutputStream` or `ParquetWriterWrapper` how about create and own it in `ParquetWriterWrapper`, and use use unique_prt to handle `_file_writer`, `_parquet_writer` in this class ########## File path: be/src/runtime/file_result_writer.cpp ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/file_result_writer.h" + +#include "exec/broker_writer.h" +#include "exec/local_file_writer.h" +#include "exec/parquet_writer.h" +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/runtime_state.h" +#include "util/types.h" +#include "util/date_func.h" +#include "util/uid_util.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; + +FileResultWriter::FileResultWriter( + const ResultFileOptions* file_opts, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _file_opts(file_opts), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile) { +} + +FileResultWriter::~FileResultWriter() { + close(); +} + +Status FileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + + RETURN_IF_ERROR(_create_file_writer()); + return Status::OK(); +} + +void FileResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("FileResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); + _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); + _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); +} + +Status FileResultWriter::_create_file_writer() { + std::string file_name = _get_next_file_name(); + if (_file_opts->is_local_file) { + _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); + } else { + _file_writer = new BrokerWriter(_state->exec_env(), + _file_opts->broker_addresses, + _file_opts->broker_properties, + file_name, + 0 /*start offset*/); + } + RETURN_IF_ERROR(_file_writer->open()); + + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + case TFileFormatType::FORMAT_PARQUET: + _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs); + break; + default: + return Status::InternalError(strings::Substitute("unsupport file format: $0", _file_opts->file_format)); + } + LOG(INFO) << "create file for exporting query result. file name: " << file_name + << ". query id: " << print_id(_state->query_id()); + return Status::OK(); +} + +// file name format as: my_prefix_0.csv +std::string FileResultWriter::_get_next_file_name() { + std::stringstream ss; + ss << _file_opts->file_path << "_" << (_file_idx++) << "." << _file_format_to_name(); + return ss.str(); +} + +std::string FileResultWriter::_file_format_to_name() { + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + return "csv"; + case TFileFormatType::FORMAT_PARQUET: + return "parquet"; + default: + return "unknown"; + } +} + +Status FileResultWriter::append_row_batch(const RowBatch* batch) { + if (NULL == batch || 0 == batch->num_rows()) { Review comment: why not nullptr ########## File path: be/src/runtime/file_result_writer.cpp ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/file_result_writer.h" + +#include "exec/broker_writer.h" +#include "exec/local_file_writer.h" +#include "exec/parquet_writer.h" +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/runtime_state.h" +#include "util/types.h" +#include "util/date_func.h" +#include "util/uid_util.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; + +FileResultWriter::FileResultWriter( + const ResultFileOptions* file_opts, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _file_opts(file_opts), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile) { +} + +FileResultWriter::~FileResultWriter() { + close(); +} + +Status FileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + + RETURN_IF_ERROR(_create_file_writer()); + return Status::OK(); +} + +void FileResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("FileResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); + _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); + _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); +} + +Status FileResultWriter::_create_file_writer() { + std::string file_name = _get_next_file_name(); + if (_file_opts->is_local_file) { + _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); + } else { + _file_writer = new BrokerWriter(_state->exec_env(), + _file_opts->broker_addresses, + _file_opts->broker_properties, + file_name, + 0 /*start offset*/); + } + RETURN_IF_ERROR(_file_writer->open()); + + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + case TFileFormatType::FORMAT_PARQUET: + _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs); + break; + default: + return Status::InternalError(strings::Substitute("unsupport file format: $0", _file_opts->file_format)); + } + LOG(INFO) << "create file for exporting query result. file name: " << file_name + << ". query id: " << print_id(_state->query_id()); + return Status::OK(); +} + +// file name format as: my_prefix_0.csv +std::string FileResultWriter::_get_next_file_name() { + std::stringstream ss; + ss << _file_opts->file_path << "_" << (_file_idx++) << "." << _file_format_to_name(); + return ss.str(); +} + +std::string FileResultWriter::_file_format_to_name() { + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + return "csv"; + case TFileFormatType::FORMAT_PARQUET: + return "parquet"; + default: + return "unknown"; + } +} + +Status FileResultWriter::append_row_batch(const RowBatch* batch) { + if (NULL == batch || 0 == batch->num_rows()) { + return Status::OK(); + } + + SCOPED_TIMER(_append_row_batch_timer); + if (_parquet_writer != nullptr) { + RETURN_IF_ERROR(_parquet_writer->write(*batch)); + } else { + RETURN_IF_ERROR(_write_csv_file(*batch)); + } + + _written_rows += batch->num_rows(); + return Status::OK(); +} + +Status FileResultWriter::_write_csv_file(const RowBatch& batch) { + int num_rows = batch.num_rows(); + for (int i = 0; i < num_rows; ++i) { + TupleRow* row = batch.get_row(i); + RETURN_IF_ERROR(_write_one_row_as_csv(row)); + } + _flush_plain_text_outstream(true); + return Status::OK(); +} + +// actually, this logic is same as `ExportSink::gen_row_buffer` +// TODO(cmy): find a way to unify them. +Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) { + { + SCOPED_TIMER(_convert_tuple_timer); + int num_columns = _output_expr_ctxs.size(); + for (int i = 0; i < num_columns; ++i) { + void* item = _output_expr_ctxs[i]->get_value(row); + + if (item == nullptr) { + _plain_text_outstream << NULL_IN_CSV; + continue; + } + + switch (_output_expr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + _plain_text_outstream << (int)*static_cast<int8_t*>(item); + break; + case TYPE_SMALLINT: + _plain_text_outstream << *static_cast<int16_t*>(item); + break; + case TYPE_INT: + _plain_text_outstream << *static_cast<int32_t*>(item); + break; + case TYPE_BIGINT: + _plain_text_outstream << *static_cast<int64_t*>(item); + break; + case TYPE_LARGEINT: + _plain_text_outstream << reinterpret_cast<PackedInt128*>(item)->value; + break; + case TYPE_FLOAT: { + char buffer[MAX_FLOAT_STR_LENGTH + 2]; + float float_value = *static_cast<float*>(item); + buffer[0] = '\0'; + int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DOUBLE: { + // To prevent loss of precision on float and double types, + // they are converted to strings before output. + // For example: For a double value 27361919854.929001, + // the direct output of using std::stringstream is 2.73619e+10, + // and after conversion to a string, it outputs 27361919854.929001 + char buffer[MAX_DOUBLE_STR_LENGTH + 2]; + double double_value = *static_cast<double*>(item); + buffer[0] = '\0'; + int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const DateTimeValue* time_val = (const DateTimeValue*)(item); + time_val->to_string(buf); + _plain_text_outstream << buf; + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: { + const StringValue* string_val = (const StringValue*)(item); + if (string_val->ptr == NULL) { + if (string_val->len != 0) { + _plain_text_outstream << NULL_IN_CSV; + } + } else { + _plain_text_outstream << std::string(string_val->ptr, string_val->len); + } + break; + } + case TYPE_DECIMAL: { + const DecimalValue* decimal_val = reinterpret_cast<const DecimalValue*>(item); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val->to_string(output_scale); + } else { + decimal_str = decimal_val->to_string(); + } + _plain_text_outstream << decimal_str; + break; + } + case TYPE_DECIMALV2: { + const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(item)->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val.to_string(output_scale); + } else { + decimal_str = decimal_val.to_string(); + } + _plain_text_outstream << decimal_str; + break; + } + default: { + // not supported type, like BITMAP, HLL, just export null + _plain_text_outstream << NULL_IN_CSV; + } + } + if (i < num_columns - 1) { + _plain_text_outstream << _file_opts->column_separator; + } + } // end for columns + _plain_text_outstream << _file_opts->line_delimiter; + } + + // write one line to file + return _flush_plain_text_outstream(false); +} + +Status FileResultWriter::_flush_plain_text_outstream(bool eos) { + SCOPED_TIMER(_file_write_timer); + size_t pos = _plain_text_outstream.tellp(); + if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) { + return Status::OK(); + } + + const std::string& buf = _plain_text_outstream.str(); + size_t written_len = 0; + RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(buf.c_str()), + buf.size(), &written_len)); + COUNTER_UPDATE(_written_data_bytes, written_len); + _current_written_bytes += written_len; + + // clear the stream + _plain_text_outstream.str(""); + _plain_text_outstream.clear(); + + // split file if exceed limit + RETURN_IF_ERROR(_create_new_file_if_exceed_size()); + + return Status::OK(); +} + +Status FileResultWriter::_create_new_file_if_exceed_size() { + if (_current_written_bytes < _file_opts->max_file_size_bytes) { + return Status::OK(); + } + // current file size exceed the max file size. close this file + // and create new one + RETURN_IF_ERROR(_close_file_writer(false)); + _current_written_bytes = 0; + return Status::OK(); +} + +Status FileResultWriter::_close_file_writer(bool done) { + SCOPED_TIMER(_writer_close_timer); + if (_parquet_writer != nullptr) { + _parquet_writer->close(); + delete _parquet_writer; + _parquet_writer = nullptr; + if (!done) { + Review comment: ???? ########## File path: be/src/runtime/mysql_result_writer.h ########## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_writer.h" +#include "runtime/runtime_state.h" + +namespace doris { + +class TupleRow; +class RowBatch; +class ExprContext; +class MysqlRowBuffer; +class BufferControlBlock; +class RuntimeProfile; + +// convert the row batch to mysql protol row +class MysqlResultWriter : public ResultWriter { +public: + MysqlResultWriter(BufferControlBlock* sinker, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile); + virtual ~MysqlResultWriter(); + + virtual Status init(RuntimeState* state) override; + // convert one row batch to mysql result and + // append this batch to the result sink + virtual Status append_row_batch(const RowBatch* batch) override; + + virtual Status close() override; + +private: + void _init_profile(); + +private: + // convert one tuple row + Status add_one_row(TupleRow* row); Review comment: why two private, and `add_one_row` not start with `_` ########## File path: be/src/runtime/mysql_result_writer.h ########## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_writer.h" +#include "runtime/runtime_state.h" + +namespace doris { + +class TupleRow; +class RowBatch; +class ExprContext; +class MysqlRowBuffer; +class BufferControlBlock; +class RuntimeProfile; + +// convert the row batch to mysql protol row +class MysqlResultWriter : public ResultWriter { +public: + MysqlResultWriter(BufferControlBlock* sinker, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile); + virtual ~MysqlResultWriter(); + + virtual Status init(RuntimeState* state) override; + // convert one row batch to mysql result and + // append this batch to the result sink + virtual Status append_row_batch(const RowBatch* batch) override; + + virtual Status close() override; + +private: + void _init_profile(); + +private: + // convert one tuple row + Status add_one_row(TupleRow* row); + + // The expressions that are run to create tuples to be written to hbase. + BufferControlBlock* _sinker; + const std::vector<ExprContext*>& _output_expr_ctxs; + MysqlRowBuffer* _row_buffer; Review comment: why not use unique_ptr ########## File path: build.sh ########## @@ -42,6 +42,7 @@ if [[ ! -f ${DORIS_THIRDPARTY}/installed/lib/libs2.a ]]; then fi PARALLEL=$[$(nproc)/4+1] +PARALLEL=12 Review comment: forget to removed ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
