This is an automated email from the ASF dual-hosted git repository. hubgeter pushed a commit to branch mc-test-branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 17685ce17e39fa7e905ef20255ee7c05893978c6 Author: daidai <[email protected]> AuthorDate: Mon May 25 18:02:27 2026 +0800 [test](log)add some log for debug maxcompute --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 52 +++++ .../writer/maxcompute/vmc_partition_writer.cpp | 131 ++++++++++- .../sink/writer/maxcompute/vmc_table_writer.cpp | 193 +++++++++++++++- .../format/transformer/vjni_format_transformer.cpp | 255 ++++++++++++++++++--- .../org/apache/doris/common/jni/JniWriter.java | 45 +++- .../doris/maxcompute/MaxComputeFeClient.java | 68 +++++- .../doris/maxcompute/MaxComputeJniWriter.java | 213 ++++++++++++++++- .../doris/datasource/maxcompute/MCTransaction.java | 104 +++++++++ .../plans/commands/insert/MCInsertExecutor.java | 41 ++++ .../apache/doris/planner/MaxComputeTableSink.java | 17 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 15 +- .../org/apache/doris/qe/runtime/LoadProcessor.java | 15 ++ .../apache/doris/service/FrontendServiceImpl.java | 26 +++ 13 files changed, 1123 insertions(+), 52 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index c8f83ad0781..9d13a8bcaf4 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -2168,15 +2168,43 @@ void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) { params.__isset.mc_commit_datas = true; params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end()); + LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_FROM_PRIMARY" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", commit_datas=" << mcd.size(); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) { params.__isset.mc_commit_datas = true; params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(), rs_mcd.end()); + LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_FROM_RUNTIME_STATE" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", commit_datas=" << rs_mcd.size(); } } } + if (params.__isset.mc_commit_datas) { + int64_t mc_rows = 0; + int64_t mc_commit_messages = 0; + int64_t mc_commit_message_bytes = 0; + for (const auto& data : params.mc_commit_datas) { + if (data.__isset.row_count) { + mc_rows += data.row_count; + } + if (data.__isset.commit_message && !data.commit_message.empty()) { + mc_commit_messages++; + mc_commit_message_bytes += data.commit_message.size(); + } + } + LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_READY" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", commit_datas=" << params.mc_commit_datas.size() + << ", rows=" << mc_rows << ", commit_messages=" << mc_commit_messages + << ", commit_message_bytes=" << mc_commit_message_bytes; + } req.runtime_state->get_unreported_errors(&(params.error_log)); params.__isset.error_log = (!params.error_log.empty()); @@ -2197,7 +2225,19 @@ void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r } try { try { + LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_BEFORE" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", has_mc_commit_datas=" << params.__isset.mc_commit_datas + << ", mc_commit_datas=" + << (params.__isset.mc_commit_datas ? params.mc_commit_datas.size() : 0); (*coord)->reportExecStatus(res, params); + LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_AFTER" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", has_mc_commit_datas=" << params.__isset.mc_commit_datas + << ", mc_commit_datas=" + << (params.__isset.mc_commit_datas ? params.mc_commit_datas.size() : 0); } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) { #ifndef ADDRESS_SANITIZER LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id) @@ -2210,7 +2250,19 @@ void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r req.cancel_fn(rpc_status); return; } + LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_RETRY_BEFORE" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", has_mc_commit_datas=" << params.__isset.mc_commit_datas + << ", mc_commit_datas=" + << (params.__isset.mc_commit_datas ? params.mc_commit_datas.size() : 0); (*coord)->reportExecStatus(res, params); + LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_RETRY_AFTER" + << ", query_id=" << print_id(req.query_id) + << ", fragment_instance_id=" << print_id(req.fragment_instance_id) + << ", has_mc_commit_datas=" << params.__isset.mc_commit_datas + << ", mc_commit_datas=" + << (params.__isset.mc_commit_datas ? params.mc_commit_datas.size() : 0); } rpc_status = Status::create<false>(res.status); diff --git a/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp b/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp index ef1e6f069da..349df0e7067 100644 --- a/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp +++ b/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp @@ -19,9 +19,20 @@ #include "format/transformer/vjni_format_transformer.h" #include "runtime/runtime_state.h" +#include "util/time.h" +#include "util/uid_util.h" namespace doris { +namespace { + +std::string mc_diag_param(const std::map<std::string, std::string>& params, const std::string& key) { + auto it = params.find(key); + return it == params.end() ? "" : it->second; +} + +} // namespace + VMCPartitionWriter::VMCPartitionWriter(RuntimeState* state, const VExprContextSPtrs& output_vexpr_ctxs, const std::string& partition_spec, @@ -32,30 +43,120 @@ VMCPartitionWriter::VMCPartitionWriter(RuntimeState* state, _writer_params(std::move(writer_params)) {} Status VMCPartitionWriter::open() { + int64_t start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITER_OPEN_ENTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec; _jni_format_transformer = std::make_unique<VJniFormatTransformer>( _state, _output_vexpr_ctxs, "org/apache/doris/maxcompute/MaxComputeJniWriter", _writer_params); - return _jni_format_transformer->open(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_OPEN_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec; + Status status = _jni_format_transformer->open(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_OPEN_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + return status; } Status VMCPartitionWriter::write(Block& block) { - RETURN_IF_ERROR(_jni_format_transformer->write(block)); + int64_t start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITE_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec << ", rows=" << block.rows() + << ", accumulated_rows=" << _row_count; + Status status = _jni_format_transformer->write(block); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITE_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec << ", rows=" << block.rows() + << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + RETURN_IF_ERROR(status); _row_count += block.rows(); return Status::OK(); } Status VMCPartitionWriter::close(const Status& status) { + int64_t start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_CLOSE_ENTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec << ", input_status=" << status.to_string() + << ", accumulated_rows=" << _row_count; Status result_status; if (_jni_format_transformer) { + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_CLOSE_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec; result_status = _jni_format_transformer->close(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_CLOSE_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec + << ", status=" << result_status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; if (!result_status.ok()) { LOG(WARNING) << "VMCPartitionWriter close failed: " << result_status.to_string(); } } if (result_status.ok() && status.ok()) { + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_BUILD_COMMIT_DATA_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec; auto commit_data = _build_mc_commit_data(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_ADD_COMMIT_DATA_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec + << ", row_count=" << commit_data.row_count + << ", has_commit_message=" << commit_data.__isset.commit_message + << ", commit_message_length=" + << (commit_data.__isset.commit_message ? commit_data.commit_message.size() : 0) + << ", has_written_bytes=" << commit_data.__isset.written_bytes; _state->add_mc_commit_datas(commit_data); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_ADD_COMMIT_DATA_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec; } + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_CLOSE_EXIT" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec << ", status=" << result_status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; return result_status; } @@ -66,7 +167,22 @@ TMCCommitData VMCPartitionWriter::_build_mc_commit_data() { // Get statistics from Java side via JNI getStatistics() if (_jni_format_transformer) { + int64_t start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_GET_STATS_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec; auto statistics = _jni_format_transformer->get_statistics(); + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_GET_STATS_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec + << ", stats_size=" << statistics.size() + << ", cost_ms=" << MonotonicMillis() - start_ms; auto it = statistics.find("mc_commit_message"); if (it != statistics.end() && !it->second.empty()) { commit_data.__set_commit_message(it->second); @@ -76,6 +192,17 @@ TMCCommitData VMCPartitionWriter::_build_mc_commit_data() { commit_data.__set_written_bytes(std::stoll(it->second)); } } + LOG(INFO) << "MC_DIAG stage=BE_PARTITION_BUILD_COMMIT_DATA_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << _partition_spec + << ", row_count=" << commit_data.row_count + << ", has_commit_message=" << commit_data.__isset.commit_message + << ", commit_message_length=" + << (commit_data.__isset.commit_message ? commit_data.commit_message.size() : 0) + << ", has_written_bytes=" << commit_data.__isset.written_bytes; return commit_data; } diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp index f4a984dac05..c7805fb9bd4 100644 --- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp +++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp @@ -24,11 +24,20 @@ #include "format/transformer/vjni_format_transformer.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" +#include "util/time.h" #include "util/uid_util.h" namespace doris { #include "common/compile_check_begin.h" +namespace { + +const char* mc_diag_bool(bool value) { + return value ? "true" : "false"; +} + +} // namespace + VMCTableWriter::VMCTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs, std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep) : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), @@ -43,11 +52,19 @@ Status VMCTableWriter::init_properties(ObjectPool* pool) { Status VMCTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; + int64_t start_ms = MonotonicMillis(); LOG(INFO) << "VMCTableWriter::open" << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) << ", per_fragment_instance_idx=" << state->per_fragment_instance_idx() << ", write_session_id=" << _mc_sink.write_session_id; + LOG(INFO) << "MC_DIAG stage=BE_TABLE_OPEN_ENTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", per_fragment_instance_idx=" << state->per_fragment_instance_idx() + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : ""); _written_rows_counter = ADD_COUNTER(_operator_profile, "WrittenRows", TUnit::UNIT); _send_data_timer = ADD_TIMER(_operator_profile, "SendDataTime"); @@ -95,6 +112,18 @@ Status VMCTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { } } + LOG(INFO) << "MC_DIAG stage=BE_TABLE_OPEN_EXIT" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", per_fragment_instance_idx=" << state->per_fragment_instance_idx() + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_columns=" << _partition_column_names.size() + << ", has_static_partition=" << mc_diag_bool(_has_static_partition) + << ", static_partition_spec=" << _static_partition_spec + << ", write_exprs=" << _write_output_vexpr_ctxs.size() + << ", cost_ms=" << MonotonicMillis() - start_ms; return Status::OK(); } @@ -124,6 +153,15 @@ std::map<std::string, std::string> VMCTableWriter::_build_base_writer_params() { params["fe_port"] = std::to_string(master_fe_addr.port); params["fe_rpc_timeout_ms"] = std::to_string(config::thrift_rpc_timeout_ms); params["fe_thrift_server_type"] = config::thrift_server_type_of_fe; + LOG(INFO) << "MC_DIAG stage=BE_TABLE_BUILD_WRITER_PARAMS" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", fe=" << master_fe_addr.hostname << ":" << master_fe_addr.port + << ", read_timeout=" << (_mc_sink.__isset.read_timeout ? _mc_sink.read_timeout : 0) + << ", retry_count=" << (_mc_sink.__isset.retry_count ? _mc_sink.retry_count : 0); return params; } @@ -134,6 +172,13 @@ std::shared_ptr<VMCPartitionWriter> VMCTableWriter::_create_partition_writer( LOG(INFO) << "VMCTableWriter::_create_partition_writer" << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) << ", partition_spec=" << partition_spec; + LOG(INFO) << "MC_DIAG stage=BE_TABLE_CREATE_PARTITION_WRITER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_spec; return std::make_shared<VMCPartitionWriter>(_state, _write_output_vexpr_ctxs, partition_spec, std::move(params)); } @@ -144,10 +189,41 @@ Status VMCTableWriter::write(RuntimeState* state, Block& block) { return Status::OK(); } + int64_t start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_WRITE_ENTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", input_rows=" << block.rows() << ", input_columns=" << block.columns() + << ", has_static_partition=" << mc_diag_bool(_has_static_partition) + << ", partition_columns=" << _partition_column_names.size(); Block output_block; - RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block, - &output_block, false)); + int64_t expr_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_EXPR_BEFORE" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", input_rows=" << block.rows() << ", input_columns=" << block.columns(); + Status status = VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block, + &output_block, false); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_EXPR_AFTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - expr_start_ms; + RETURN_IF_ERROR(status); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_MATERIALIZE_BEFORE" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", output_rows=" << output_block.rows() + << ", output_columns=" << output_block.columns(); materialize_block_inplace(output_block); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_MATERIALIZE_AFTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", output_rows=" << output_block.rows() + << ", output_columns=" << output_block.columns(); _row_count += output_block.rows(); @@ -156,12 +232,46 @@ Status VMCTableWriter::write(RuntimeState* state, Block& block) { auto it = _partitions_to_writers.find(_static_partition_spec); if (it == _partitions_to_writers.end()) { auto writer = _create_partition_writer(_static_partition_spec); - RETURN_IF_ERROR(writer->open()); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_BEFORE" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << _static_partition_spec; + status = writer->open(); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_AFTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << _static_partition_spec + << ", status=" << status.to_string(); + RETURN_IF_ERROR(status); _partitions_to_writers.insert({_static_partition_spec, writer}); it = _partitions_to_writers.find(_static_partition_spec); } output_block.erase(_non_write_columns_indices); - return it->second->write(output_block); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_BEFORE" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << _static_partition_spec + << ", rows=" << output_block.rows() << ", columns=" << output_block.columns(); + status = it->second->write(output_block); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_AFTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << _static_partition_spec + << ", rows=" << output_block.rows() << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + return status; } // Case 2: Dynamic partition or non-partitioned table @@ -172,20 +282,81 @@ Status VMCTableWriter::write(RuntimeState* state, Block& block) { auto it = _partitions_to_writers.find(partition_key); if (it == _partitions_to_writers.end()) { auto writer = _create_partition_writer(""); - RETURN_IF_ERROR(writer->open()); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_BEFORE" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_key + << ", dynamic_partition=" << mc_diag_bool(!_partition_column_names.empty()); + status = writer->open(); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_AFTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_key << ", status=" << status.to_string(); + RETURN_IF_ERROR(status); _partitions_to_writers.insert({partition_key, writer}); it = _partitions_to_writers.find(partition_key); } - return it->second->write(output_block); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_BEFORE" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_key + << ", dynamic_partition=" << mc_diag_bool(!_partition_column_names.empty()) + << ", rows=" << output_block.rows() << ", columns=" << output_block.columns(); + status = it->second->write(output_block); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_AFTER" + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_key << ", rows=" << output_block.rows() + << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + return status; } Status VMCTableWriter::close(Status status) { + int64_t start_ms = MonotonicMillis(); Status result_status; int64_t partitions_count = _partitions_to_writers.size(); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_ENTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", input_status=" << status.to_string() + << ", partitions_count=" << partitions_count + << ", row_count=" << _row_count; { SCOPED_RAW_TIMER(&_close_ns); for (const auto& [partition_spec, writer] : _partitions_to_writers) { + LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_PARTITION_BEFORE" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_spec; + int64_t partition_close_start_ms = MonotonicMillis(); Status st = writer->close(status); + LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_PARTITION_AFTER" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", partition_spec=" << partition_spec << ", status=" << st.to_string() + << ", cost_ms=" << MonotonicMillis() - partition_close_start_ms; if (!st.ok()) { LOG(WARNING) << "VMCPartitionWriter close failed for partition " << partition_spec << ": " << st.to_string(); @@ -202,6 +373,16 @@ Status VMCTableWriter::close(Status status) { COUNTER_SET(_close_timer, _close_ns); COUNTER_SET(_partition_writers_count, partitions_count); } + LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_EXIT" + << ", fragment_instance_id=" << print_id(_state->fragment_instance_id()) + << ", table=" << (_mc_sink.__isset.table_name ? _mc_sink.table_name : "") + << ", txn_id=" << (_mc_sink.__isset.txn_id ? std::to_string(_mc_sink.txn_id) : "") + << ", write_session_id=" + << (_mc_sink.__isset.write_session_id ? _mc_sink.write_session_id : "") + << ", result_status=" << result_status.to_string() + << ", partitions_count=" << partitions_count + << ", row_count=" << _row_count + << ", cost_ms=" << MonotonicMillis() - start_ms; return result_status; } diff --git a/be/src/format/transformer/vjni_format_transformer.cpp b/be/src/format/transformer/vjni_format_transformer.cpp index 782818f05aa..ac9a9bdb33f 100644 --- a/be/src/format/transformer/vjni_format_transformer.cpp +++ b/be/src/format/transformer/vjni_format_transformer.cpp @@ -19,9 +19,19 @@ #include "exec/connector/jni_connector.h" #include "runtime/runtime_state.h" +#include "util/time.h" namespace doris { +namespace { + +std::string mc_diag_param(const std::map<std::string, std::string>& params, const std::string& key) { + auto it = params.find(key); + return it == params.end() ? "" : it->second; +} + +} // namespace + VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state, const VExprContextSPtrs& output_vexpr_ctxs, std::string writer_class, @@ -32,45 +42,118 @@ VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state, Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) { // Load writer class via the same class loader as JniScanner + LOG(INFO) << "MC_DIAG stage=BE_JNI_INIT_WRITER_ENTER" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << mc_diag_param(_writer_params, "partition_spec") + << ", batch_size=" << batch_size; Jni::GlobalClass jni_writer_cls; - RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls)); + int64_t start_ms = MonotonicMillis(); + Status status = Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls); + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_CLASS_AFTER" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + RETURN_IF_ERROR(status); // Get constructor: (int batchSize, Map<String,String> params) Jni::MethodId writer_constructor; - RETURN_IF_ERROR( - jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", &writer_constructor)); + start_ms = MonotonicMillis(); + status = jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", &writer_constructor); + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_CONSTRUCTOR_AFTER" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + RETURN_IF_ERROR(status); // Convert C++ params map to Java HashMap Jni::LocalObject hashmap_object; - RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params, &hashmap_object)); + start_ms = MonotonicMillis(); + status = Jni::Util::convert_to_java_map(env, _writer_params, &hashmap_object); + LOG(INFO) << "MC_DIAG stage=BE_JNI_CONVERT_PARAMS_AFTER" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + RETURN_IF_ERROR(status); // Create writer instance - RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor) - .with_arg((jint)batch_size) - .with_arg(hashmap_object) - .call(&_jni_writer_obj)); + start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JNI_NEW_WRITER_BEFORE" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id"); + status = jni_writer_cls.new_object(env, writer_constructor) + .with_arg((jint)batch_size) + .with_arg(hashmap_object) + .call(&_jni_writer_obj); + LOG(INFO) << "MC_DIAG stage=BE_JNI_NEW_WRITER_AFTER" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + RETURN_IF_ERROR(status); // Resolve method IDs - RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open)); - RETURN_IF_ERROR( - jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", &_jni_writer_write)); - RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V", &_jni_writer_close)); - RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics", "()Ljava/util/Map;", - &_jni_writer_get_statistics)); + status = jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open); + RETURN_IF_ERROR(status); + status = jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", &_jni_writer_write); + RETURN_IF_ERROR(status); + status = jni_writer_cls.get_method(env, "close", "()V", &_jni_writer_close); + RETURN_IF_ERROR(status); + status = jni_writer_cls.get_method(env, "getStatistics", "()Ljava/util/Map;", + &_jni_writer_get_statistics); + RETURN_IF_ERROR(status); + LOG(INFO) << "MC_DIAG stage=BE_JNI_INIT_WRITER_EXIT" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id"); return Status::OK(); } Status VJniFormatTransformer::open() { + int64_t open_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_ENTER" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", partition_spec=" << mc_diag_param(_writer_params, "partition_spec"); JNIEnv* env = nullptr; - RETURN_IF_ERROR(Jni::Env::Get(&env)); + Status status = Jni::Env::Get(&env); + if (!status.ok()) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_GET_ENV_AFTER" + << ", status=" << status.to_string(); + return status; + } int batch_size = _state->batch_size(); - RETURN_IF_ERROR(_init_jni_writer(env, batch_size)); + status = _init_jni_writer(env, batch_size); + if (!status.ok()) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_INIT_AFTER" + << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - open_start_ms; + return status; + } - RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_open).call()); + int64_t java_open_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_OPEN_BEFORE" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id"); + status = _jni_writer_obj.call_void_method(env, _jni_writer_open).call(); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_OPEN_CALL_RETURNED" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - java_open_start_ms; + RETURN_IF_ERROR(status); RETURN_ERROR_IF_EXC(env); _opened = true; + LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_EXIT" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", cost_ms=" << MonotonicMillis() - open_start_ms; return Status::OK(); } @@ -79,20 +162,50 @@ Status VJniFormatTransformer::write(const Block& block) { return Status::OK(); } + int64_t write_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_ENTER" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", rows=" << block.rows() << ", columns=" << block.columns(); JNIEnv* env = nullptr; - RETURN_IF_ERROR(Jni::Env::Get(&env)); + Status status = Jni::Env::Get(&env); + if (!status.ok()) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_GET_ENV_AFTER" + << ", status=" << status.to_string(); + return status; + } // 1. Convert Block to Java table metadata (column addresses) Block* mutable_block = const_cast<Block*>(&block); std::unique_ptr<long[]> input_table; - RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table)); + int64_t convert_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_TO_JAVA_TABLE_BEFORE" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", rows=" << block.rows() << ", columns=" << block.columns(); + status = JniConnector::to_java_table(mutable_block, input_table); + LOG(INFO) << "MC_DIAG stage=BE_TO_JAVA_TABLE_AFTER" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - convert_start_ms; + RETURN_IF_ERROR(status); // 2. Cache schema on first call if (!_schema_cached) { + int64_t schema_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_PARSE_SCHEMA_BEFORE" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table"); auto schema = JniConnector::parse_table_schema(mutable_block); _cached_required_fields = schema.first; _cached_columns_types = schema.second; _schema_cached = true; + LOG(INFO) << "MC_DIAG stage=BE_PARSE_SCHEMA_AFTER" + << ", writer_class=" << _writer_class + << ", required_fields_length=" << _cached_required_fields.size() + << ", columns_types_length=" << _cached_columns_types.size() + << ", cost_ms=" << MonotonicMillis() - schema_start_ms; } // 3. Build input params map for Java writer @@ -103,28 +216,83 @@ Status VJniFormatTransformer::write(const Block& block) { // 4. Convert to Java Map and call writer.write(inputParams) Jni::LocalObject input_map; - RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map)); + int64_t map_start_ms = MonotonicMillis(); + status = Jni::Util::convert_to_java_map(env, input_params, &input_map); + LOG(INFO) << "MC_DIAG stage=BE_CONVERT_INPUT_MAP_AFTER" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - map_start_ms; + RETURN_IF_ERROR(status); - RETURN_IF_ERROR( - _jni_writer_obj.call_void_method(env, _jni_writer_write).with_arg(input_map).call()); + int64_t java_write_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_WRITE_BEFORE" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", rows=" << block.rows() << ", columns=" << block.columns(); + status = _jni_writer_obj.call_void_method(env, _jni_writer_write).with_arg(input_map).call(); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_WRITE_CALL_RETURNED" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - java_write_start_ms; + RETURN_IF_ERROR(status); RETURN_ERROR_IF_EXC(env); _cur_written_rows += block.rows(); + LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_EXIT" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", rows=" << block.rows() + << ", cur_written_rows=" << _cur_written_rows + << ", cost_ms=" << MonotonicMillis() - write_start_ms; return Status::OK(); } Status VJniFormatTransformer::close() { if (_closed || !_opened) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_SKIP" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", opened=" << _opened << ", closed=" << _closed; return Status::OK(); } _closed = true; + int64_t close_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_ENTER" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", cur_written_rows=" << _cur_written_rows; JNIEnv* env = nullptr; - RETURN_IF_ERROR(Jni::Env::Get(&env)); + Status status = Jni::Env::Get(&env); + if (!status.ok()) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_GET_ENV_AFTER" + << ", status=" << status.to_string(); + return status; + } - RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_close).call()); + int64_t java_close_start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_CLOSE_BEFORE" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id"); + status = _jni_writer_obj.call_void_method(env, _jni_writer_close).call(); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_CLOSE_CALL_RETURNED" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - java_close_start_ms; + RETURN_IF_ERROR(status); RETURN_ERROR_IF_EXC(env); + LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_EXIT" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id") + << ", cost_ms=" << MonotonicMillis() - close_start_ms; return Status::OK(); } @@ -136,26 +304,57 @@ int64_t VJniFormatTransformer::written_len() { std::map<std::string, std::string> VJniFormatTransformer::get_statistics() { std::map<std::string, std::string> result; if (!_opened) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_SKIP_NOT_OPENED" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table"); return result; } + int64_t start_ms = MonotonicMillis(); + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_ENTER" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", txn_id=" << mc_diag_param(_writer_params, "txn_id") + << ", write_session_id=" << mc_diag_param(_writer_params, "write_session_id"); JNIEnv* env = nullptr; - if (!Jni::Env::Get(&env).ok()) { + Status status = Jni::Env::Get(&env); + if (!status.ok()) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_GET_ENV_AFTER" + << ", status=" << status.to_string(); return result; } Jni::LocalObject stats_map; - if (!_jni_writer_obj.call_object_method(env, _jni_writer_get_statistics) - .call(&stats_map) - .ok()) { + status = _jni_writer_obj.call_object_method(env, _jni_writer_get_statistics).call(&stats_map); + LOG(INFO) << "MC_DIAG stage=BE_JAVA_GET_STATS_CALL_RETURNED" + << ", writer_class=" << _writer_class << ", status=" << status.to_string() + << ", cost_ms=" << MonotonicMillis() - start_ms; + if (!status.ok()) { return result; } if (stats_map.uninitialized()) { + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_EMPTY" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table"); return result; } // Convert Java Map<String,String> to C++ map static_cast<void>(Jni::Util::convert_to_cpp_map(env, stats_map, &result)); + auto commit_it = result.find("mc_commit_message"); + auto written_rows_it = result.find("counter:WrittenRows"); + auto written_bytes_it = result.find("bytes:WrittenBytes"); + LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_EXIT" + << ", writer_class=" << _writer_class + << ", table=" << mc_diag_param(_writer_params, "table") + << ", stats_size=" << result.size() + << ", commit_message_length=" + << (commit_it == result.end() ? 0 : commit_it->second.size()) + << ", written_rows=" + << (written_rows_it == result.end() ? "" : written_rows_it->second) + << ", written_bytes=" + << (written_bytes_it == result.end() ? "" : written_bytes_it->second) + << ", cost_ms=" << MonotonicMillis() - start_ms; return result; } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java index 2dfefd2ac79..b94f7a0908f 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java @@ -20,6 +20,8 @@ package org.apache.doris.common.jni; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.VectorTable; +import org.apache.log4j.Logger; + import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -32,6 +34,8 @@ import java.util.Map; * Lifecycle: open() -> write() [repeated] -> close() */ public abstract class JniWriter { + private static final Logger LOG = Logger.getLogger(JniWriter.class); + protected int batchSize; protected Map<String, String> params; protected ColumnType[] columnTypes; @@ -51,10 +55,21 @@ public abstract class JniWriter { * then delegates to writeInternal. */ public void write(Map<String, String> inputParams) throws IOException { + long writeEnterNs = System.nanoTime(); + String requiredFields = inputParams.get("required_fields"); + String columnsTypes = inputParams.get("columns_types"); + LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_ENTER writer=" + getClass().getName() + + ", batchSize=" + batchSize + + ", schemaCached=" + (columnTypes != null) + + ", requiredFieldsLength=" + (requiredFields == null ? 0 : requiredFields.length()) + + ", columnsTypesLength=" + (columnsTypes == null ? 0 : columnsTypes.length()) + + ", thread=" + Thread.currentThread().getName()); + // Parse and cache schema on first call if (columnTypes == null) { - String requiredFields = inputParams.get("required_fields"); - String columnsTypes = inputParams.get("columns_types"); + long schemaStartNs = System.nanoTime(); + LOG.info("MC_DIAG stage=JNI_WRITER_SCHEMA_PARSE_BEFORE writer=" + getClass().getName() + + ", thread=" + Thread.currentThread().getName()); if (requiredFields != null && !requiredFields.isEmpty()) { fields = requiredFields.split(","); String[] typeStrs = columnsTypes.split("#"); @@ -66,15 +81,37 @@ public abstract class JniWriter { fields = new String[0]; columnTypes = new ColumnType[0]; } + LOG.info("MC_DIAG stage=JNI_WRITER_SCHEMA_PARSE_AFTER writer=" + getClass().getName() + + ", fields=" + fields.length + + ", columnTypes=" + columnTypes.length + + ", costMs=" + elapsedMs(schemaStartNs) + + ", thread=" + Thread.currentThread().getName()); } long startRead = System.nanoTime(); + LOG.info("MC_DIAG stage=JNI_WRITER_CREATE_READABLE_TABLE_BEFORE writer=" + getClass().getName() + + ", thread=" + Thread.currentThread().getName()); VectorTable inputTable = VectorTable.createReadableTable(inputParams); readTableTime += System.nanoTime() - startRead; + LOG.info("MC_DIAG stage=JNI_WRITER_CREATE_READABLE_TABLE_AFTER writer=" + getClass().getName() + + ", rows=" + inputTable.getNumRows() + + ", columns=" + inputTable.getNumColumns() + + ", costMs=" + elapsedMs(startRead) + + ", thread=" + Thread.currentThread().getName()); long startWrite = System.nanoTime(); + LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_INTERNAL_BEFORE writer=" + getClass().getName() + + ", rows=" + inputTable.getNumRows() + + ", columns=" + inputTable.getNumColumns() + + ", thread=" + Thread.currentThread().getName()); writeInternal(inputTable); writeTime += System.nanoTime() - startWrite; + LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_INTERNAL_AFTER writer=" + getClass().getName() + + ", rows=" + inputTable.getNumRows() + + ", columns=" + inputTable.getNumColumns() + + ", writeCostMs=" + elapsedMs(startWrite) + + ", totalCostMs=" + elapsedMs(writeEnterNs) + + ", thread=" + Thread.currentThread().getName()); } protected abstract void writeInternal(VectorTable inputTable) throws IOException; @@ -96,4 +133,8 @@ public abstract class JniWriter { public long getReadTableTime() { return readTableTime; } + + private static long elapsedMs(long startNs) { + return (System.nanoTime() - startNs) / 1_000_000L; + } } diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java index 82b58f48493..1c907b7c331 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java @@ -87,12 +87,22 @@ class MaxComputeFeClient implements AutoCloseable { throw new IOException("empty MaxCompute write_session_id for block_id allocation"); } + long startNs = System.nanoTime(); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_BEFORE txnId=" + txnId + + ", writeSessionId=" + writeSessionId + + ", masterFe=" + formatAddress(masterAddress) + + ", rpcTimeoutMs=" + rpcTimeoutMs); TMaxComputeBlockIdRequest request = buildBlockIdRequest(txnId, writeSessionId); - return callWithMasterRedirect( + long blockId = callWithMasterRedirect( "allocate MaxCompute block_id", client -> client.getMaxComputeBlockIdRange(request), (result, requestAddress, retryTimes) -> handleBlockIdResult(result, requestAddress, retryTimes, txnId, writeSessionId)); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_AFTER txnId=" + txnId + + ", writeSessionId=" + writeSessionId + + ", blockId=" + blockId + + ", costMs=" + elapsedMs(startNs)); + return blockId; } @Override @@ -109,16 +119,30 @@ class MaxComputeFeClient implements AutoCloseable { for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES; retryTimes++) { TNetworkAddress requestAddress = copyAddress(masterAddress); T result; + long rpcStartNs = System.nanoTime(); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_RPC_BEFORE operation=" + operation + + ", retry=" + retryTimes + + ", fe=" + formatAddress(requestAddress) + + ", rpcTimeoutMs=" + rpcTimeoutMs + + ", framedTransport=" + useFramedTransport()); try { result = rpcExecutor.call(requestAddress, rpcTimeoutMs, useFramedTransport(), call); } catch (Exception e) { lastException = e; rpcExecutor.close(); + LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_RPC_ERROR operation=" + operation + + ", retry=" + retryTimes + + ", fe=" + formatAddress(requestAddress) + + ", costMs=" + elapsedMs(rpcStartNs), e); LOG.warn("Failed to " + operation + ", rpc failure, retry_time=" + retryTimes + ", fe=" + formatAddress(requestAddress), e); sleepBeforeRetry(); continue; } + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_RPC_AFTER operation=" + operation + + ", retry=" + retryTimes + + ", fe=" + formatAddress(requestAddress) + + ", costMs=" + elapsedMs(rpcStartNs)); try { return handler.handle(result, requestAddress, retryTimes); @@ -155,6 +179,11 @@ class MaxComputeFeClient implements AutoCloseable { + formatAddress(requestAddress) + ", switch to FE@" + formatAddress(result.getMasterAddress()) + ", retry_time=" + retryTimes + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_NOT_MASTER txnId=" + txnId + + ", writeSessionId=" + writeSessionId + + ", requestFe=" + formatAddress(requestAddress) + + ", masterFe=" + formatAddress(result.getMasterAddress()) + + ", retry=" + retryTimes); throw new NotMasterException(result.getMasterAddress()); } @@ -173,11 +202,11 @@ class MaxComputeFeClient implements AutoCloseable { + result.getLength() + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId); } - if (LOG.isDebugEnabled()) { - LOG.debug("Allocated MaxCompute block_id from FE@" + formatAddress(requestAddress) - + ", txn_id=" + txnId + ", write_session_id=" + writeSessionId - + ", block_id=" + result.getStart()); - } + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_RESULT txnId=" + txnId + + ", writeSessionId=" + writeSessionId + + ", fe=" + formatAddress(requestAddress) + + ", retry=" + retryTimes + + ", blockId=" + result.getStart()); return result.getStart(); } @@ -248,6 +277,10 @@ class MaxComputeFeClient implements AutoCloseable { return address.getHostname() + ":" + address.getPort(); } + private static long elapsedMs(long startNs) { + return (System.nanoTime() - startNs) / 1_000_000L; + } + interface RpcExecutor { <T> T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, FeCall<T> call) throws Exception; @@ -273,10 +306,22 @@ class MaxComputeFeClient implements AutoCloseable { @Override public synchronized <T> T call(TNetworkAddress address, int timeoutMs, boolean useFramedTransport, FeCall<T> call) throws Exception { + long callStartNs = System.nanoTime(); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_EXECUTOR_CALL_ENTER fe=" + formatAddress(address) + + ", timeoutMs=" + timeoutMs + + ", framedTransport=" + useFramedTransport); ensureConnected(address, timeoutMs, useFramedTransport); try { - return call.call(client); + long thriftCallStartNs = System.nanoTime(); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_THRIFT_CALL_BEFORE fe=" + formatAddress(address)); + T result = call.call(client); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_THRIFT_CALL_AFTER fe=" + formatAddress(address) + + ", thriftCallCostMs=" + elapsedMs(thriftCallStartNs) + + ", totalCostMs=" + elapsedMs(callStartNs)); + return result; } catch (Exception e) { + LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_EXECUTOR_CALL_ERROR fe=" + formatAddress(address) + + ", costMs=" + elapsedMs(callStartNs), e); close(); throw e; } @@ -297,10 +342,15 @@ class MaxComputeFeClient implements AutoCloseable { if (client != null && transport != null && transport.isOpen() && connectedFramedTransport == useFramedTransport && sameAddress(connectedAddress, address)) { + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_REUSE_CONNECTION fe=" + formatAddress(address)); return; } close(); + long connectStartNs = System.nanoTime(); + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_BEFORE fe=" + formatAddress(address) + + ", timeoutMs=" + timeoutMs + + ", framedTransport=" + useFramedTransport); TTransport newTransport = createTransport(address, timeoutMs, useFramedTransport); try { newTransport.open(); @@ -308,7 +358,11 @@ class MaxComputeFeClient implements AutoCloseable { client = new FrontendService.Client(new TBinaryProtocol(transport)); connectedAddress = copyAddress(address); connectedFramedTransport = useFramedTransport; + LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_AFTER fe=" + formatAddress(address) + + ", costMs=" + elapsedMs(connectStartNs)); } catch (Exception e) { + LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_ERROR fe=" + formatAddress(address) + + ", costMs=" + elapsedMs(connectStartNs), e); newTransport.close(); throw e; } diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java index 9788184057e..334f7124f59 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java @@ -156,18 +156,37 @@ public class MaxComputeJniWriter extends JniWriter { params.getOrDefault(MCProperties.WRITE_MAX_BLOCK_BYTES, MCProperties.DEFAULT_WRITE_MAX_BLOCK_BYTES)); this.feClient = MaxComputeFeClient.create(params); + logDiag("JAVA_WRITER_CONSTRUCTED", + "connectTimeout=" + connectTimeout + + ", readTimeout=" + readTimeout + + ", retryCount=" + retryCount + + ", maxBlockBytes=" + maxBlockBytes + + ", preallocatedBlockId=" + preallocatedBlockId); } @Override public void open() throws IOException { + logDiag("JAVA_OPEN_ENTER", + "connectTimeout=" + connectTimeout + + ", readTimeout=" + readTimeout + + ", retryCount=" + retryCount + + ", maxBlockBytes=" + maxBlockBytes); try { + long stageStartNs = System.nanoTime(); + logDiag("JAVA_CREATE_MC_CLIENT_BEFORE", ""); Odps odps = MCUtils.createMcClient(params); odps.setDefaultProject(project); odps.setEndpoint(endpoint); + logDiag("JAVA_CREATE_MC_CLIENT_AFTER", "costMs=" + elapsedMs(stageStartNs)); + stageStartNs = System.nanoTime(); + logDiag("JAVA_BUILD_CREDENTIALS_BEFORE", ""); Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()) .withAppAccount(odps.getAppAccount()).build(); + logDiag("JAVA_BUILD_CREDENTIALS_AFTER", "costMs=" + elapsedMs(stageStartNs)); + stageStartNs = System.nanoTime(); + logDiag("JAVA_BUILD_SETTINGS_BEFORE", ""); RestOptions restOptions = RestOptions.newBuilder() .withConnectTimeout(connectTimeout) .withReadTimeout(readTimeout) @@ -179,16 +198,22 @@ public class MaxComputeJniWriter extends JniWriter { .withQuotaName(Strings.isNullOrEmpty(quota) ? null : quota) .withRestOptions(restOptions) .build(); + logDiag("JAVA_BUILD_SETTINGS_AFTER", "costMs=" + elapsedMs(stageStartNs)); // Restore the write session created by FE + stageStartNs = System.nanoTime(); + logDiag("JAVA_RESTORE_SESSION_BEFORE", ""); writeSession = new TableWriteSessionBuilder() .identifier(com.aliyun.odps.table.TableIdentifier.of(project, tableName)) .withSessionId(writeSessionId) .withSettings(settings) .buildBatchWriteSession(); + logDiag("JAVA_RESTORE_SESSION_AFTER", "costMs=" + elapsedMs(stageStartNs)); // SDK skips ArrowOptions when restoring session via withSessionId, // set it via reflection to avoid NPE in ArrowWriterImpl + stageStartNs = System.nanoTime(); + logDiag("JAVA_SET_ARROW_OPTIONS_BEFORE", ""); ArrowOptions arrowOptions = ArrowOptions.newBuilder() .withDatetimeUnit(TimestampUnit.MILLI) .withTimestampUnit(TimestampUnit.MILLI) @@ -197,8 +222,11 @@ public class MaxComputeJniWriter extends JniWriter { .getSuperclass().getDeclaredField("arrowOptions"); arrowField.setAccessible(true); arrowField.set(writeSession, arrowOptions); + logDiag("JAVA_SET_ARROW_OPTIONS_AFTER", "costMs=" + elapsedMs(stageStartNs)); // Get schema info for type mapping + stageStartNs = System.nanoTime(); + logDiag("JAVA_REQUIRED_SCHEMA_BEFORE", ""); com.aliyun.odps.table.DataSchema dataSchema = writeSession.requiredSchema(); columnTypeInfos = new java.util.ArrayList<>(); columnNames = new java.util.ArrayList<>(); @@ -206,20 +234,30 @@ public class MaxComputeJniWriter extends JniWriter { columnTypeInfos.add(col.getTypeInfo()); columnNames.add(col.getName()); } + logDiag("JAVA_REQUIRED_SCHEMA_AFTER", + "columns=" + columnNames.size() + ", costMs=" + elapsedMs(stageStartNs)); + stageStartNs = System.nanoTime(); + logDiag("JAVA_ALLOCATOR_CREATE_BEFORE", ""); allocator = new RootAllocator(Long.MAX_VALUE); + logDiag("JAVA_ALLOCATOR_CREATE_AFTER", "costMs=" + elapsedMs(stageStartNs)); + stageStartNs = System.nanoTime(); + logDiag("JAVA_WRITER_OPTIONS_BEFORE", ""); writerOptions = WriterOptions.newBuilder() .withSettings(settings) .withCompressionCodec(CompressionCodec.ZSTD) .build(); + logDiag("JAVA_WRITER_OPTIONS_AFTER", "costMs=" + elapsedMs(stageStartNs)); openBatchWriter(resolveInitialBlockId()); LOG.info("MaxComputeJniWriter opened: project=" + project + ", table=" + tableName + ", writeSessionId=" + writeSessionId + ", partitionSpec=" + partitionSpec + ", blockId=" + currentBlockId); + logDiag("JAVA_OPEN_EXIT", ""); } catch (Exception e) { String errorMsg = "Failed to open MaxCompute write session for table " + project + "." + tableName; + logDiag("JAVA_OPEN_ERROR", "error=" + e.getClass().getName() + ": " + e.getMessage()); LOG.error(errorMsg, e); throw new IOException(errorMsg, e); } @@ -233,58 +271,112 @@ public class MaxComputeJniWriter extends JniWriter { return; } + long startNs = System.nanoTime(); + logDiag("JAVA_WRITE_INTERNAL_ENTER", "rows=" + numRows + ", columns=" + numCols); try { writeRowsWithRowChecks(inputTable, numRows, numCols); + logDiag("JAVA_WRITE_INTERNAL_EXIT", + "rows=" + numRows + ", columns=" + numCols + ", costMs=" + elapsedMs(startNs)); } catch (Exception e) { String errorMsg = "Failed to write data to MaxCompute table " + project + "." + tableName; + logDiag("JAVA_WRITE_INTERNAL_ERROR", + "rows=" + numRows + ", columns=" + numCols + + ", costMs=" + elapsedMs(startNs) + + ", error=" + e.getClass().getName() + ": " + e.getMessage()); LOG.error(errorMsg, e); throw new IOException(errorMsg, e); } } private long resolveInitialBlockId() throws IOException { - return preallocatedBlockId != null ? preallocatedBlockId : requestBlockId(); + if (preallocatedBlockId != null) { + logDiag("JAVA_RESOLVE_INITIAL_BLOCK_PREALLOCATED", "blockId=" + preallocatedBlockId); + return preallocatedBlockId; + } + return requestBlockId(); } private long requestBlockId() throws IOException { - return feClient.requestBlockId(txnId, writeSessionId); + long startNs = System.nanoTime(); + logDiag("JAVA_REQUEST_BLOCK_ID_BEFORE", ""); + long blockId = feClient.requestBlockId(txnId, writeSessionId); + logDiag("JAVA_REQUEST_BLOCK_ID_AFTER", + "newBlockId=" + blockId + ", costMs=" + elapsedMs(startNs)); + return blockId; } private void openBatchWriter(long blockId) throws IOException { + long startNs = System.nanoTime(); + logDiag("JAVA_OPEN_BATCH_WRITER_BEFORE", "targetBlockId=" + blockId); currentBlockId = blockId; currentBlockWrittenBytes = 0L; batchWriter = writeSession.createArrowWriter(blockId, WriterAttemptId.of(0), writerOptions); + logDiag("JAVA_OPEN_BATCH_WRITER_AFTER", + "targetBlockId=" + blockId + ", costMs=" + elapsedMs(startNs)); } private void closeCurrentBatchWriterAndCollectCommit() throws IOException { if (batchWriter == null) { + logDiag("JAVA_COMMIT_SKIP_NO_BATCH_WRITER", ""); return; } + long startNs = System.nanoTime(); + logDiag("JAVA_COMMIT_BEFORE", + "currentBlockWrittenBytes=" + currentBlockWrittenBytes + + ", commitMessageCount=" + commitMessages.size()); WriterCommitMessage commitMessage = batchWriter.commit(); if (commitMessage != null) { commitMessages.add(commitMessage); } batchWriter = null; + logDiag("JAVA_COMMIT_AFTER", + "commitMessageAdded=" + (commitMessage != null) + + ", commitMessageCount=" + commitMessages.size() + + ", costMs=" + elapsedMs(startNs)); } private void rotateCurrentBatchWriter() throws IOException { + long startNs = System.nanoTime(); + logDiag("JAVA_ROTATE_BEFORE", + "currentBlockWrittenBytes=" + currentBlockWrittenBytes + + ", commitMessageCount=" + commitMessages.size()); closeCurrentBatchWriterAndCollectCommit(); openBatchWriter(requestBlockId()); + logDiag("JAVA_ROTATE_AFTER", "costMs=" + elapsedMs(startNs)); } private void writeRowsWithRowChecks(VectorTable inputTable, int numRows, int numCols) throws IOException { + logDiag("JAVA_WRITE_ROWS_ENTER", "rows=" + numRows + ", columns=" + numCols); int rowStart = 0; while (rowStart < numRows) { int rowEnd = rowStart; long batchEstimatedBytes = 0L; boolean rotateAfterWrite = false; + int estimatedRows = 0; + long rangeStartNs = System.nanoTime(); + logDiag("JAVA_RANGE_SELECT_BEFORE", + "rowStart=" + rowStart + + ", rows=" + numRows + + ", currentBlockWrittenBytes=" + currentBlockWrittenBytes); while (rowEnd < numRows) { + if (estimatedRows == 0 || estimatedRows % 1024 == 0) { + logDiag("JAVA_RANGE_SELECT_PROGRESS", + "rowStart=" + rowStart + + ", probingRow=" + rowEnd + + ", estimatedRows=" + estimatedRows + + ", batchEstimatedBytes=" + batchEstimatedBytes); + } long rowEstimatedBytes = estimateSingleRowPayloadBytes(inputTable, numCols, rowEnd); + estimatedRows++; boolean exceedsHardLimit = currentBlockWrittenBytes + batchEstimatedBytes + rowEstimatedBytes > maxBlockBytes; if (exceedsHardLimit) { if (rowEnd == rowStart) { if (currentBlockWrittenBytes > 0) { + logDiag("JAVA_RANGE_SELECT_ROTATE_FOR_OVERSIZE_ROW", + "rowStart=" + rowStart + + ", rowEstimatedBytes=" + rowEstimatedBytes + + ", currentBlockWrittenBytes=" + currentBlockWrittenBytes); rotateCurrentBatchWriter(); continue; } @@ -301,28 +393,75 @@ public class MaxComputeJniWriter extends JniWriter { break; } } + logDiag("JAVA_RANGE_SELECT_AFTER", + "rowStart=" + rowStart + + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart) + + ", estimatedRows=" + estimatedRows + + ", batchEstimatedBytes=" + batchEstimatedBytes + + ", rotateAfterWrite=" + rotateAfterWrite + + ", costMs=" + elapsedMs(rangeStartNs)); if (rowEnd == rowStart) { + long fallbackStartNs = System.nanoTime(); + logDiag("JAVA_RANGE_SELECT_FALLBACK_BEFORE", "rowStart=" + rowStart); long rowEstimatedBytes = estimateSingleRowPayloadBytes(inputTable, numCols, rowStart); batchEstimatedBytes = rowEstimatedBytes; rowEnd = rowStart + 1; rotateAfterWrite = true; + logDiag("JAVA_RANGE_SELECT_FALLBACK_AFTER", + "rowStart=" + rowStart + + ", rowEstimatedBytes=" + rowEstimatedBytes + + ", costMs=" + elapsedMs(fallbackStartNs)); } - try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, rowStart, rowEnd)) { + long buildStartNs = System.nanoTime(); + logDiag("JAVA_BUILD_ROOT_BEFORE", + "rowStart=" + rowStart + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart)); + try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, rowStart, rowEnd, true)) { + logDiag("JAVA_BUILD_ROOT_AFTER", + "rowStart=" + rowStart + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart) + + ", costMs=" + elapsedMs(buildStartNs)); + long writeStartNs = System.nanoTime(); + logDiag("JAVA_BATCH_WRITE_BEFORE", + "rowStart=" + rowStart + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart) + + ", batchEstimatedBytes=" + batchEstimatedBytes); batchWriter.write(root); + logDiag("JAVA_BATCH_WRITE_AFTER", + "rowStart=" + rowStart + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart) + + ", costMs=" + elapsedMs(writeStartNs)); } + long flushStartNs = System.nanoTime(); + logDiag("JAVA_FLUSH_BEFORE", + "rowStart=" + rowStart + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart)); batchWriter.flush(); + logDiag("JAVA_FLUSH_AFTER", + "rowStart=" + rowStart + ", rowEnd=" + rowEnd + + ", selectedRows=" + (rowEnd - rowStart) + + ", costMs=" + elapsedMs(flushStartNs)); int rowsWrittenNow = rowEnd - rowStart; writtenRows += rowsWrittenNow; currentBlockWrittenBytes += batchEstimatedBytes; writtenBytes += batchEstimatedBytes; + logDiag("JAVA_BATCH_DONE", + "rowsWrittenNow=" + rowsWrittenNow + + ", nextRowStart=" + rowEnd + + ", currentBlockWrittenBytes=" + currentBlockWrittenBytes + + ", writtenRows=" + writtenRows + + ", writtenBytes=" + writtenBytes + + ", rotateAfterWrite=" + rotateAfterWrite); rowStart = rowEnd; if (rotateAfterWrite && rowStart < numRows) { rotateCurrentBatchWriter(); } } + logDiag("JAVA_WRITE_ROWS_EXIT", "rows=" + numRows + ", columns=" + numCols); } private static class CountingDiscardOutputStream extends OutputStream { @@ -339,7 +478,7 @@ public class MaxComputeJniWriter extends JniWriter { private long estimateSingleRowPayloadBytes(VectorTable inputTable, int numCols, int rowIndex) throws IOException { - try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, rowIndex, rowIndex + 1); + try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, rowIndex, rowIndex + 1, false); ArrowWriter estimator = ArrowWriterFactory.getRecordBatchWriter( new CountingDiscardOutputStream(), writerOptions)) { estimator.writeBatch(root); @@ -347,13 +486,32 @@ public class MaxComputeJniWriter extends JniWriter { } } - private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int numCols, int rowStart, int rowEnd) { + private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int numCols, int rowStart, int rowEnd, + boolean logColumns) { int rowCount = rowEnd - rowStart; VectorSchemaRoot root = batchWriter.newElement(); root.setRowCount(rowCount); for (int col = 0; col < numCols && col < columnTypeInfos.size(); col++) { + long columnStartNs = System.nanoTime(); + if (logColumns) { + logDiag("JAVA_BUILD_COLUMN_BEFORE", + "columnIndex=" + col + + ", columnName=" + columnNames.get(col) + + ", odpsType=" + columnTypeInfos.get(col).getOdpsType() + + ", rowStart=" + rowStart + + ", rowCount=" + rowCount); + } fillArrowVectorStreaming(root, col, columnTypeInfos.get(col).getOdpsType(), inputTable.getColumn(col), rowStart, rowCount); + if (logColumns) { + logDiag("JAVA_BUILD_COLUMN_AFTER", + "columnIndex=" + col + + ", columnName=" + columnNames.get(col) + + ", odpsType=" + columnTypeInfos.get(col).getOdpsType() + + ", rowStart=" + rowStart + + ", rowCount=" + rowCount + + ", costMs=" + elapsedMs(columnStartNs)); + } } return root; } @@ -904,28 +1062,42 @@ public class MaxComputeJniWriter extends JniWriter { @Override public void close() throws IOException { + long closeStartNs = System.nanoTime(); + logDiag("JAVA_CLOSE_ENTER", ""); try { closeCurrentBatchWriterAndCollectCommit(); if (allocator != null) { + long allocatorStartNs = System.nanoTime(); + logDiag("JAVA_ALLOCATOR_CLOSE_BEFORE", ""); allocator.close(); allocator = null; + logDiag("JAVA_ALLOCATOR_CLOSE_AFTER", "costMs=" + elapsedMs(allocatorStartNs)); } LOG.info("MaxComputeJniWriter closed: writeSessionId=" + writeSessionId + ", partitionSpec=" + partitionSpec + ", writtenRows=" + writtenRows + ", lastBlockId=" + currentBlockId + ", commitMessageCount=" + commitMessages.size()); + logDiag("JAVA_CLOSE_EXIT", "costMs=" + elapsedMs(closeStartNs)); } catch (Exception e) { String errorMsg = "Failed to close MaxCompute arrow writer"; + logDiag("JAVA_CLOSE_ERROR", + "costMs=" + elapsedMs(closeStartNs) + + ", error=" + e.getClass().getName() + ": " + e.getMessage()); LOG.error(errorMsg, e); throw new IOException(errorMsg, e); } finally { + long feClientCloseStartNs = System.nanoTime(); + logDiag("JAVA_FE_CLIENT_CLOSE_BEFORE", ""); feClient.close(); + logDiag("JAVA_FE_CLIENT_CLOSE_AFTER", "costMs=" + elapsedMs(feClientCloseStartNs)); } } @Override public Map<String, String> getStatistics() { + long startNs = System.nanoTime(); + logDiag("JAVA_GET_STATISTICS_ENTER", "commitMessageCount=" + commitMessages.size()); Map<String, String> stats = new HashMap<>(); stats.put("mc_partition_spec", partitionSpec != null ? partitionSpec : ""); @@ -936,8 +1108,14 @@ public class MaxComputeJniWriter extends JniWriter { ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(commitMessages); oos.close(); - stats.put("mc_commit_message", Base64.getEncoder().encodeToString(baos.toByteArray())); + String encoded = Base64.getEncoder().encodeToString(baos.toByteArray()); + stats.put("mc_commit_message", encoded); + logDiag("JAVA_GET_STATISTICS_COMMIT_MESSAGE_SERIALIZED", + "commitMessageCount=" + commitMessages.size() + + ", encodedLength=" + encoded.length()); } catch (IOException e) { + logDiag("JAVA_GET_STATISTICS_ERROR", + "error=" + e.getClass().getName() + ": " + e.getMessage()); LOG.error("Failed to serialize WriterCommitMessages", e); } } @@ -946,6 +1124,29 @@ public class MaxComputeJniWriter extends JniWriter { stats.put("bytes:WrittenBytes", String.valueOf(writtenBytes)); stats.put("timer:WriteTime", String.valueOf(writeTime)); stats.put("timer:ReadTableTime", String.valueOf(readTableTime)); + logDiag("JAVA_GET_STATISTICS_EXIT", + "statsSize=" + stats.size() + + ", costMs=" + elapsedMs(startNs)); return stats; } + + private void logDiag(String stage, String detail) { + String message = "MC_DIAG stage=" + stage + + ", table=" + tableName + + ", txnId=" + txnId + + ", writeSessionId=" + writeSessionId + + ", partitionSpec=" + partitionSpec + + ", blockId=" + currentBlockId + + ", writtenRows=" + writtenRows + + ", writtenBytes=" + writtenBytes + + ", thread=" + Thread.currentThread().getName(); + if (detail != null && !detail.isEmpty()) { + message += ", " + detail; + } + LOG.info(message); + } + + private static long elapsedMs(long startNs) { + return (System.nanoTime() - startNs) / 1_000_000L; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java index 9f1c61ddf24..fa9fcefab81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java @@ -64,8 +64,24 @@ public class MCTransaction implements Transaction { } public void updateMCCommitData(List<TMCCommitData> commitDataList) { + long incomingRows = 0; + int incomingCommitMessages = 0; + long incomingCommitMessageBytes = 0; + for (TMCCommitData data : commitDataList) { + incomingRows += data.getRowCount(); + if (data.isSetCommitMessage() && !data.getCommitMessage().isEmpty()) { + incomingCommitMessages++; + incomingCommitMessageBytes += data.getCommitMessage().length(); + } + } synchronized (this) { + LOG.info("MC_DIAG stage=FE_UPDATE_COMMIT_DATA table={} sessionId={} incomingDatas={} incomingRows={}" + + " incomingCommitMessages={} incomingCommitMessageBytes={} existingDatas={}", + table == null ? "null" : table.getName(), writeSessionId, commitDataList.size(), incomingRows, + incomingCommitMessages, incomingCommitMessageBytes, this.commitDataList.size()); this.commitDataList.addAll(commitDataList); + LOG.info("MC_DIAG stage=FE_UPDATE_COMMIT_DATA_DONE table={} sessionId={} totalDatas={}", + table == null ? "null" : table.getName(), writeSessionId, this.commitDataList.size()); } } @@ -73,7 +89,15 @@ public class MCTransaction implements Transaction { this.table = (MaxComputeExternalTable) dorisTable; try { + long beginStartMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_START db={} table={}", + table.getDbName(), table.getName()); + long tableIdStartMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_GET_ODPS_TABLE_ID_BEFORE db={} table={}", + table.getDbName(), table.getName()); TableIdentifier tableId = catalog.getOdpsTableIdentifier(table.getDbName(), table.getName()); + LOG.info("MC_DIAG stage=FE_GET_ODPS_TABLE_ID_AFTER db={} table={} costMs={}", + table.getDbName(), table.getName(), System.currentTimeMillis() - tableIdStartMs); boolean isDynamicPartition = !table.getPartitionColumns().isEmpty(); boolean isStaticPartition = false; @@ -94,6 +118,10 @@ public class MCTransaction implements Transaction { } isOverwrite = mcCtx.isOverwrite(); } + LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_ENTER table={}.{} dynamicPartition={} staticPartition={}" + + " staticPartitionSpec={} overwrite={} partitionColumns={}", + catalog.getDefaultProject(), table.getName(), isDynamicPartition, isStaticPartition, + staticPartitionSpecStr, isOverwrite, table.getPartitionColumns().size()); TableWriteSessionBuilder builder = new TableWriteSessionBuilder() .identifier(tableId) @@ -114,13 +142,25 @@ public class MCTransaction implements Transaction { builder.overwrite(true); } + long buildSessionStartMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_BUILD_WRITE_SESSION_BEFORE table={}.{} dynamicPartition={}" + + " staticPartition={} overwrite={}", + catalog.getDefaultProject(), table.getName(), isDynamicPartition, isStaticPartition, isOverwrite); TableBatchWriteSession writeSession = builder.buildBatchWriteSession(); writeSessionId = writeSession.getId(); nextBlockId.set(0); + LOG.info("MC_DIAG stage=FE_BUILD_WRITE_SESSION_AFTER table={}.{} sessionId={} costMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, + System.currentTimeMillis() - buildSessionStartMs); LOG.info("Created MC Storage API write session: {} for table {}.{}", writeSessionId, catalog.getDefaultProject(), table.getName()); + LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_EXIT table={}.{} sessionId={} costMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, + System.currentTimeMillis() - beginStartMs); } catch (Exception e) { + LOG.warn("MC_DIAG stage=FE_BEGIN_INSERT_ERROR table={} error={}: {}", + dorisTable.getName(), e.getClass().getName(), e.getMessage(), e); throw new UserException("Failed to begin insert for MaxCompute table " + dorisTable.getName() + ": " + e.getMessage(), e); } @@ -131,6 +171,11 @@ public class MCTransaction implements Transaction { } public long allocateBlockIdRange(String requestWriteSessionId, long length) throws UserException { + long startMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_ALLOCATE_BLOCK_ID_ENTER table={} sessionId={} requestSessionId={} length={}" + + " nextBlockId={}", + table == null ? "null" : table.getName(), writeSessionId, requestWriteSessionId, length, + nextBlockId.get()); if (length <= 0) { throw new UserException("MaxCompute block_id allocation length must be positive: " + length); } @@ -156,29 +201,48 @@ public class MCTransaction implements Transaction { LOG.info("Allocated MaxCompute block_id range: sessionId={}, start={}, length={}", writeSessionId, start, length); + LOG.info("MC_DIAG stage=FE_ALLOCATE_BLOCK_ID_EXIT table={} sessionId={} start={} length={} nextBlockId={}" + + " costMs={}", + table == null ? "null" : table.getName(), writeSessionId, start, length, nextBlockId.get(), + System.currentTimeMillis() - startMs); return start; } private void appendCommitMessages(List<WriterCommitMessage> allMessages, String encodedCommitMessage) throws Exception { + long startMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_BEFORE table={} sessionId={} encodedLength={}", + table == null ? "null" : table.getName(), writeSessionId, encodedCommitMessage.length()); byte[] bytes = Base64.getDecoder().decode(encodedCommitMessage); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bais); Object payload = ois.readObject(); ois.close(); + LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_DECODED table={} sessionId={} payloadType={}" + + " decodedBytes={} costMs={}", + table == null ? "null" : table.getName(), writeSessionId, + payload == null ? "null" : payload.getClass().getName(), bytes.length, + System.currentTimeMillis() - startMs); if (payload instanceof WriterCommitMessage) { allMessages.add((WriterCommitMessage) payload); + LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_AFTER table={} sessionId={} added=1 totalMessages={}", + table == null ? "null" : table.getName(), writeSessionId, allMessages.size()); return; } if (payload instanceof List<?>) { + int added = 0; for (Object item : (List<?>) payload) { if (!(item instanceof WriterCommitMessage)) { throw new UserException("Unexpected MaxCompute commit payload item type: " + (item == null ? "null" : item.getClass().getName())); } allMessages.add((WriterCommitMessage) item); + added++; } + LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_AFTER table={} sessionId={} added={}" + + " totalMessages={}", + table == null ? "null" : table.getName(), writeSessionId, added, allMessages.size()); return; } throw new UserException("Unexpected MaxCompute commit payload type: " @@ -188,33 +252,69 @@ public class MCTransaction implements Transaction { public void finishInsert() throws UserException { try { long t0 = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_FINISH_INSERT_ENTER table={}.{} sessionId={} commitDataCount={}" + + " updateRows={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, commitDataList.size(), + getUpdateCnt()); // Collect all WriterCommitMessages from BEs List<WriterCommitMessage> allMessages = new ArrayList<>(); synchronized (this) { + LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGES_BEFORE table={}.{} sessionId={}" + + " commitDataCount={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, commitDataList.size()); for (TMCCommitData data : commitDataList) { if (data.isSetCommitMessage() && !data.getCommitMessage().isEmpty()) { + LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGE_ITEM table={}.{} sessionId={}" + + " partitionSpec={} rowCount={} commitMessageLength={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, + data.isSetPartitionSpec() ? data.getPartitionSpec() : "", + data.getRowCount(), data.getCommitMessage().length()); appendCommitMessages(allMessages, data.getCommitMessage()); } } } long t1 = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGES_AFTER table={}.{} sessionId={}" + + " writerCommitMessages={} costMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, allMessages.size(), t1 - t0); // Restore session and commit all messages + long tableIdStartMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_COMMIT_GET_ODPS_TABLE_ID_BEFORE table={}.{} sessionId={}", + catalog.getDefaultProject(), table.getName(), writeSessionId); TableIdentifier tableId = catalog.getOdpsTableIdentifier(table.getDbName(), table.getName()); + LOG.info("MC_DIAG stage=FE_COMMIT_GET_ODPS_TABLE_ID_AFTER table={}.{} sessionId={} costMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, + System.currentTimeMillis() - tableIdStartMs); + LOG.info("MC_DIAG stage=FE_RESTORE_COMMIT_SESSION_BEFORE table={}.{} sessionId={}", + catalog.getDefaultProject(), table.getName(), writeSessionId); TableBatchWriteSession commitSession = new TableWriteSessionBuilder() .identifier(tableId) .withSessionId(writeSessionId) .withSettings(catalog.getSettings()) .buildBatchWriteSession(); long t2 = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_RESTORE_COMMIT_SESSION_AFTER table={}.{} sessionId={} costMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, t2 - t1); + LOG.info("MC_DIAG stage=FE_COMMIT_SESSION_BEFORE table={}.{} sessionId={} writerCommitMessages={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, allMessages.size()); commitSession.commit(allMessages.toArray(new WriterCommitMessage[0])); long t3 = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_COMMIT_SESSION_AFTER table={}.{} sessionId={} writerCommitMessages={}" + + " costMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, allMessages.size(), t3 - t2); LOG.info("Committed MC write session {} with {} messages for table {}.{}" + " Breakdown: deserialize={}ms, restoreSession={}ms, commit={}ms, total={}ms", writeSessionId, allMessages.size(), catalog.getDefaultProject(), table.getName(), t1 - t0, t2 - t1, t3 - t2, t3 - t0); + LOG.info("MC_DIAG stage=FE_FINISH_INSERT_EXIT table={}.{} sessionId={} writerCommitMessages={}" + + " totalCostMs={}", + catalog.getDefaultProject(), table.getName(), writeSessionId, allMessages.size(), t3 - t0); } catch (Exception e) { + LOG.warn("MC_DIAG stage=FE_FINISH_INSERT_ERROR table={}.{} sessionId={} error={}: {}", + catalog.getDefaultProject(), table == null ? "null" : table.getName(), writeSessionId, + e.getClass().getName(), e.getMessage(), e); throw new UserException("Failed to commit MaxCompute write session: " + e.getMessage(), e); } } @@ -222,11 +322,15 @@ public class MCTransaction implements Transaction { @Override public void commit() throws UserException { // commit is handled in finishInsert() + LOG.info("MC_DIAG stage=FE_TRANSACTION_COMMIT_NOOP table={} sessionId={}", + table == null ? "null" : table.getName(), writeSessionId); } @Override public void rollback() { // MC sessions auto-expire if not committed; no explicit rollback needed + LOG.info("MC_DIAG stage=FE_TRANSACTION_ROLLBACK table={} sessionId={} commitDataCount={}", + table == null ? "null" : table.getName(), writeSessionId, commitDataList.size()); LOG.info("MCTransaction rollback called; uncommitted sessions will auto-expire."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java index 47df06485e7..7d6cb608d31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.maxcompute.MCTransaction; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.nereids.NereidsPlanner; @@ -52,29 +53,69 @@ public class MCInsertExecutor extends BaseExternalTableInsertExecutor { @Override protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + LOG.info("MC_DIAG stage=FE_MC_FINALIZE_SINK_ENTER queryId={} label={} table={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName()); // Let parent call bindDataSink() to build the Thrift sink super.finalizeSink(fragment, sink, physicalSink); // Save reference so beforeExec() can inject writeSessionId later mcTableSink = (MaxComputeTableSink) sink; + LOG.info("MC_DIAG stage=FE_MC_FINALIZE_SINK_EXIT queryId={} label={} table={} hasSink={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), mcTableSink != null); } @Override protected void beforeExec() throws UserException { + long startMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_MC_BEFORE_EXEC_ENTER queryId={} label={} table={} txnId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId); // 1. Create Storage API write session as part of the transaction MCTransaction transaction = (MCTransaction) transactionManager.getTransaction(txnId); + long beginInsertStartMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_MC_BEGIN_INSERT_BEFORE queryId={} label={} table={} txnId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId); transaction.beginInsert((MaxComputeExternalTable) table, insertCtx); + LOG.info("MC_DIAG stage=FE_MC_BEGIN_INSERT_AFTER queryId={} label={} table={} txnId={} sessionId={}" + + " costMs={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId, + transaction.getWriteSessionId(), System.currentTimeMillis() - beginInsertStartMs); // 2. Inject write context into the Thrift sink before fragments are sent to BE if (mcTableSink != null) { + LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_BEFORE queryId={} label={} table={} txnId={}" + + " sessionId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId, + transaction.getWriteSessionId()); mcTableSink.setWriteContext(txnId, transaction.getWriteSessionId()); + LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_AFTER queryId={} label={} table={} txnId={}" + + " sessionId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId, + transaction.getWriteSessionId()); + } else { + LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_SKIP queryId={} label={} table={} txnId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId); } + LOG.info("MC_DIAG stage=FE_MC_BEFORE_EXEC_EXIT queryId={} label={} table={} txnId={} sessionId={}" + + " costMs={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId, + transaction.getWriteSessionId(), System.currentTimeMillis() - startMs); } @Override protected void doBeforeCommit() throws UserException { + long startMs = System.currentTimeMillis(); + LOG.info("MC_DIAG stage=FE_MC_DO_BEFORE_COMMIT_ENTER queryId={} label={} table={} txnId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId); MCTransaction transaction = (MCTransaction) transactionManager.getTransaction(txnId); loadedRows = transaction.getUpdateCnt(); + LOG.info("MC_DIAG stage=FE_MC_FINISH_INSERT_BEFORE queryId={} label={} table={} txnId={} loadedRows={}" + + " sessionId={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId, loadedRows, + transaction.getWriteSessionId()); transaction.finishInsert(); + LOG.info("MC_DIAG stage=FE_MC_FINISH_INSERT_AFTER queryId={} label={} table={} txnId={} loadedRows={}" + + " sessionId={} costMs={}", + DebugUtil.printId(ctx.queryId()), labelName, table.getName(), txnId, loadedRows, + transaction.getWriteSessionId(), System.currentTimeMillis() - startMs); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java index 98537fa0307..93be5a914fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java @@ -28,6 +28,9 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TMaxComputeTableSink; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,6 +40,8 @@ import java.util.stream.Collectors; public class MaxComputeTableSink extends BaseExternalTableDataSink { + private static final Logger LOG = LogManager.getLogger(MaxComputeTableSink.class); + private final MaxComputeExternalTable targetTable; public MaxComputeTableSink(MaxComputeExternalTable targetTable) { @@ -97,6 +102,11 @@ public class MaxComputeTableSink extends BaseExternalTableDataSink { tDataSink = new TDataSink(TDataSinkType.MAXCOMPUTE_TABLE_SINK); tDataSink.setMaxComputeTableSink(tSink); + LOG.info("MC_DIAG stage=FE_MC_BIND_SINK table={} project={} endpoint={} partitionColumns={}" + + " hasStaticPartition={} connectTimeout={} readTimeout={} retryCount={}", + targetTable.getName(), catalog.getDefaultProject(), catalog.getEndpoint(), + partitionColumnNames.size(), tSink.isSetStaticPartitionSpec(), catalog.getConnectTimeout(), + catalog.getReadTimeout(), catalog.getRetryTimes()); } /** @@ -106,8 +116,15 @@ public class MaxComputeTableSink extends BaseExternalTableDataSink { */ public void setWriteContext(long txnId, String writeSessionId) { if (tDataSink != null && tDataSink.isSetMaxComputeTableSink()) { + LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_BEFORE table={} txnId={} sessionId={}", + targetTable.getName(), txnId, writeSessionId); tDataSink.getMaxComputeTableSink().setTxnId(txnId); tDataSink.getMaxComputeTableSink().setWriteSessionId(writeSessionId); + LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_AFTER table={} txnId={} sessionId={}", + targetTable.getName(), txnId, writeSessionId); + } else { + LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_SKIP table={} txnId={} sessionId={}", + targetTable.getName(), txnId, writeSessionId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index dac24f2f798..321a5aab912 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2535,6 +2535,20 @@ public class Coordinator implements CoordInterface { .updateIcebergCommitData(params.getIcebergCommitDatas()); } if (params.isSetMcCommitDatas()) { + long mcRows = 0; + int mcCommitMessages = 0; + long mcCommitMessageBytes = 0; + for (org.apache.doris.thrift.TMCCommitData data : params.getMcCommitDatas()) { + mcRows += data.getRowCount(); + if (data.isSetCommitMessage() && !data.getCommitMessage().isEmpty()) { + mcCommitMessages++; + mcCommitMessageBytes += data.getCommitMessage().length(); + } + } + LOG.info("MC_DIAG stage=FE_COORDINATOR_MC_COMMIT_DATA queryId={} txnId={} fragmentId={} backendId={}" + + " datas={} rows={} commitMessages={} commitMessageBytes={}", + DebugUtil.printId(queryId), txnId, params.getFragmentId(), params.getBackendId(), + params.getMcCommitDatasSize(), mcRows, mcCommitMessages, mcCommitMessageBytes); ((MCTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) .updateMCCommitData(params.getMcCommitDatas()); } @@ -3554,4 +3568,3 @@ public class Coordinator implements CoordInterface { this.queryOptions.setEnableProfile(isSafe && queryOptions.isEnableProfile()); } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java index 45995a7aad7..48c08700494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java @@ -231,6 +231,21 @@ public class LoadProcessor extends AbstractJobProcessor { .updateIcebergCommitData(params.getIcebergCommitDatas()); } if (params.isSetMcCommitDatas()) { + long mcRows = 0; + int mcCommitMessages = 0; + long mcCommitMessageBytes = 0; + for (org.apache.doris.thrift.TMCCommitData data : params.getMcCommitDatas()) { + mcRows += data.getRowCount(); + if (data.isSetCommitMessage() && !data.getCommitMessage().isEmpty()) { + mcCommitMessages++; + mcCommitMessageBytes += data.getCommitMessage().length(); + } + } + LOG.info("MC_DIAG stage=FE_LOAD_PROCESSOR_MC_COMMIT_DATA queryId={} txnId={} fragmentId={}" + + " backendId={} datas={} rows={} commitMessages={} commitMessageBytes={}", + DebugUtil.printId(coordinatorContext.queryId), txnId, params.getFragmentId(), + params.getBackendId(), params.getMcCommitDatasSize(), mcRows, mcCommitMessages, + mcCommitMessageBytes); ((MCTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) .updateMCCommitData(params.getMcCommitDatas()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 3957eec4474..4f38242ffb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2928,8 +2928,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TMaxComputeBlockIdResult getMaxComputeBlockIdRange(TMaxComputeBlockIdRequest request) { + long startMs = System.currentTimeMillis(); String clientAddr = getClientAddrAsString(); LOG.info("receive getMaxComputeBlockIdRange request: {}, backend: {}", request, clientAddr); + LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_ENTER txnId={} sessionId={} length={} backend={}", + request.getTxnId(), request.getWriteSessionId(), request.getLength(), clientAddr); TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult(); TStatus status = checkMaster(); @@ -2937,10 +2940,16 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (status.getStatusCode() != TStatusCode.OK) { result.setMasterAddress(getMasterAddress()); + LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_NOT_MASTER txnId={} sessionId={} backend={} status={}" + + " master={} costMs={}", + request.getTxnId(), request.getWriteSessionId(), clientAddr, status.getStatusCode(), + result.getMasterAddress(), System.currentTimeMillis() - startMs); return result; } try { + LOG.info("MC_DIAG stage=FE_BLOCK_ID_GET_TXN_BEFORE txnId={} sessionId={} backend={}", + request.getTxnId(), request.getWriteSessionId(), clientAddr); Transaction transaction = Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr() .getTxnById(request.getTxnId()); if (!(transaction instanceof MCTransaction)) { @@ -2948,25 +2957,42 @@ public class FrontendServiceImpl implements FrontendService.Iface { + " is not a MaxCompute transaction"); } + LOG.info("MC_DIAG stage=FE_BLOCK_ID_ALLOCATE_BEFORE txnId={} sessionId={} length={} backend={}", + request.getTxnId(), request.getWriteSessionId(), request.getLength(), clientAddr); long start = ((MCTransaction) transaction).allocateBlockIdRange( request.getWriteSessionId(), request.getLength()); result.setStart(start); result.setLength(request.getLength()); + LOG.info("MC_DIAG stage=FE_BLOCK_ID_ALLOCATE_AFTER txnId={} sessionId={} start={} length={}" + + " backend={} costMs={}", + request.getTxnId(), request.getWriteSessionId(), start, request.getLength(), clientAddr, + System.currentTimeMillis() - startMs); } catch (UserException e) { + LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_USER_ERROR txnId={} sessionId={} backend={} error={}", + request.getTxnId(), request.getWriteSessionId(), clientAddr, e.getMessage(), e); LOG.warn("failed to allocate MaxCompute block_id, txnId={}, sessionId={}, errmsg={}", request.getTxnId(), request.getWriteSessionId(), e.getMessage()); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); } catch (RuntimeException e) { + LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_RUNTIME_ERROR txnId={} sessionId={} backend={} error={}", + request.getTxnId(), request.getWriteSessionId(), clientAddr, e.getMessage(), e); LOG.warn("failed to allocate MaxCompute block_id, txnId={}, sessionId={}, errmsg={}", request.getTxnId(), request.getWriteSessionId(), e.getMessage(), e); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); } catch (Throwable e) { + LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_UNKNOWN_ERROR txnId={} sessionId={} backend={} error={}", + request.getTxnId(), request.getWriteSessionId(), clientAddr, e.getMessage(), e); LOG.warn("catch unknown result when allocating MaxCompute block_id.", e); status.setStatusCode(TStatusCode.INTERNAL_ERROR); status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage())); } + LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_EXIT txnId={} sessionId={} status={} startSet={} start={} lengthSet={}" + + " length={} backend={} costMs={}", + request.getTxnId(), request.getWriteSessionId(), status.getStatusCode(), result.isSetStart(), + result.isSetStart() ? result.getStart() : -1, result.isSetLength(), + result.isSetLength() ? result.getLength() : -1, clientAddr, System.currentTimeMillis() - startMs); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
