This is an automated email from the ASF dual-hosted git repository.

hubgeter pushed a commit to branch mc-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/mc-test-branch-4.1 by this 
push:
     new f597ada252c [test](log)add some log for debug maxcompute
f597ada252c is described below

commit f597ada252ccfbc4acdf7a7cf4f28889f5272db9
Author: daidai <[email protected]>
AuthorDate: Mon May 25 18:02:27 2026 +0800

    [test](log)add some log for debug maxcompute
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp |  52 +++++
 .../writer/maxcompute/vmc_partition_writer.cpp     | 131 ++++++++++-
 .../sink/writer/maxcompute/vmc_table_writer.cpp    | 193 +++++++++++++++-
 .../format/transformer/vjni_format_transformer.cpp | 255 ++++++++++++++++++---
 .../org/apache/doris/common/jni/JniWriter.java     |  45 +++-
 .../doris/maxcompute/MaxComputeFeClient.java       |  68 +++++-
 .../doris/maxcompute/MaxComputeJniWriter.java      | 213 ++++++++++++++++-
 .../doris/datasource/maxcompute/MCTransaction.java | 104 +++++++++
 .../plans/commands/insert/MCInsertExecutor.java    |  41 ++++
 .../apache/doris/planner/MaxComputeTableSink.java  |  17 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  15 +-
 .../org/apache/doris/qe/runtime/LoadProcessor.java |  15 ++
 .../apache/doris/service/FrontendServiceImpl.java  |  26 +++
 13 files changed, 1123 insertions(+), 52 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index c8f83ad0781..9d13a8bcaf4 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -2168,15 +2168,43 @@ void 
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
     if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
         params.__isset.mc_commit_datas = true;
         params.mc_commit_datas.insert(params.mc_commit_datas.end(), 
mcd.begin(), mcd.end());
+        LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_FROM_PRIMARY"
+                  << ", query_id=" << print_id(req.query_id)
+                  << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                  << ", commit_datas=" << mcd.size();
     } else if (!req.runtime_states.empty()) {
         for (auto* rs : req.runtime_states) {
             if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
                 params.__isset.mc_commit_datas = true;
                 params.mc_commit_datas.insert(params.mc_commit_datas.end(), 
rs_mcd.begin(),
                                               rs_mcd.end());
+                LOG(INFO) << "MC_DIAG 
stage=BE_REPORT_MC_COMMIT_DATA_FROM_RUNTIME_STATE"
+                          << ", query_id=" << print_id(req.query_id)
+                          << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                          << ", commit_datas=" << rs_mcd.size();
             }
         }
     }
+    if (params.__isset.mc_commit_datas) {
+        int64_t mc_rows = 0;
+        int64_t mc_commit_messages = 0;
+        int64_t mc_commit_message_bytes = 0;
+        for (const auto& data : params.mc_commit_datas) {
+            if (data.__isset.row_count) {
+                mc_rows += data.row_count;
+            }
+            if (data.__isset.commit_message && !data.commit_message.empty()) {
+                mc_commit_messages++;
+                mc_commit_message_bytes += data.commit_message.size();
+            }
+        }
+        LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_READY"
+                  << ", query_id=" << print_id(req.query_id)
+                  << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                  << ", commit_datas=" << params.mc_commit_datas.size()
+                  << ", rows=" << mc_rows << ", commit_messages=" << 
mc_commit_messages
+                  << ", commit_message_bytes=" << mc_commit_message_bytes;
+    }
 
     req.runtime_state->get_unreported_errors(&(params.error_log));
     params.__isset.error_log = (!params.error_log.empty());
@@ -2197,7 +2225,19 @@ void 
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
     }
     try {
         try {
+            LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_BEFORE"
+                      << ", query_id=" << print_id(req.query_id)
+                      << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                      << ", has_mc_commit_datas=" << 
params.__isset.mc_commit_datas
+                      << ", mc_commit_datas="
+                      << (params.__isset.mc_commit_datas ? 
params.mc_commit_datas.size() : 0);
             (*coord)->reportExecStatus(res, params);
+            LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_AFTER"
+                      << ", query_id=" << print_id(req.query_id)
+                      << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                      << ", has_mc_commit_datas=" << 
params.__isset.mc_commit_datas
+                      << ", mc_commit_datas="
+                      << (params.__isset.mc_commit_datas ? 
params.mc_commit_datas.size() : 0);
         } catch ([[maybe_unused]] 
apache::thrift::transport::TTransportException& e) {
 #ifndef ADDRESS_SANITIZER
             LOG(WARNING) << "Retrying ReportExecStatus. query id: " << 
print_id(req.query_id)
@@ -2210,7 +2250,19 @@ void 
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
                 req.cancel_fn(rpc_status);
                 return;
             }
+            LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_RETRY_BEFORE"
+                      << ", query_id=" << print_id(req.query_id)
+                      << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                      << ", has_mc_commit_datas=" << 
params.__isset.mc_commit_datas
+                      << ", mc_commit_datas="
+                      << (params.__isset.mc_commit_datas ? 
params.mc_commit_datas.size() : 0);
             (*coord)->reportExecStatus(res, params);
+            LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_RETRY_AFTER"
+                      << ", query_id=" << print_id(req.query_id)
+                      << ", fragment_instance_id=" << 
print_id(req.fragment_instance_id)
+                      << ", has_mc_commit_datas=" << 
params.__isset.mc_commit_datas
+                      << ", mc_commit_datas="
+                      << (params.__isset.mc_commit_datas ? 
params.mc_commit_datas.size() : 0);
         }
 
         rpc_status = Status::create<false>(res.status);
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp 
b/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
index ef1e6f069da..349df0e7067 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
@@ -19,9 +19,20 @@
 
 #include "format/transformer/vjni_format_transformer.h"
 #include "runtime/runtime_state.h"
+#include "util/time.h"
+#include "util/uid_util.h"
 
 namespace doris {
 
+namespace {
+
+std::string mc_diag_param(const std::map<std::string, std::string>& params, 
const std::string& key) {
+    auto it = params.find(key);
+    return it == params.end() ? "" : it->second;
+}
+
+} // namespace
+
 VMCPartitionWriter::VMCPartitionWriter(RuntimeState* state,
                                        const VExprContextSPtrs& 
output_vexpr_ctxs,
                                        const std::string& partition_spec,
@@ -32,30 +43,120 @@ VMCPartitionWriter::VMCPartitionWriter(RuntimeState* state,
           _writer_params(std::move(writer_params)) {}
 
 Status VMCPartitionWriter::open() {
+    int64_t start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITER_OPEN_ENTER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec;
     _jni_format_transformer = std::make_unique<VJniFormatTransformer>(
             _state, _output_vexpr_ctxs, 
"org/apache/doris/maxcompute/MaxComputeJniWriter",
             _writer_params);
-    return _jni_format_transformer->open();
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_OPEN_BEFORE"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec;
+    Status status = _jni_format_transformer->open();
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_OPEN_AFTER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    return status;
 }
 
 Status VMCPartitionWriter::write(Block& block) {
-    RETURN_IF_ERROR(_jni_format_transformer->write(block));
+    int64_t start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITE_BEFORE"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec << ", rows=" << 
block.rows()
+              << ", accumulated_rows=" << _row_count;
+    Status status = _jni_format_transformer->write(block);
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITE_AFTER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec << ", rows=" << 
block.rows()
+              << ", status=" << status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    RETURN_IF_ERROR(status);
     _row_count += block.rows();
     return Status::OK();
 }
 
 Status VMCPartitionWriter::close(const Status& status) {
+    int64_t start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_CLOSE_ENTER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec << ", input_status=" 
<< status.to_string()
+              << ", accumulated_rows=" << _row_count;
     Status result_status;
     if (_jni_format_transformer) {
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_CLOSE_BEFORE"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec;
         result_status = _jni_format_transformer->close();
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_CLOSE_AFTER"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec
+                  << ", status=" << result_status.to_string()
+                  << ", cost_ms=" << MonotonicMillis() - start_ms;
         if (!result_status.ok()) {
             LOG(WARNING) << "VMCPartitionWriter close failed: " << 
result_status.to_string();
         }
     }
     if (result_status.ok() && status.ok()) {
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_BUILD_COMMIT_DATA_BEFORE"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec;
         auto commit_data = _build_mc_commit_data();
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_ADD_COMMIT_DATA_BEFORE"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec
+                  << ", row_count=" << commit_data.row_count
+                  << ", has_commit_message=" << 
commit_data.__isset.commit_message
+                  << ", commit_message_length="
+                  << (commit_data.__isset.commit_message ? 
commit_data.commit_message.size() : 0)
+                  << ", has_written_bytes=" << 
commit_data.__isset.written_bytes;
         _state->add_mc_commit_datas(commit_data);
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_ADD_COMMIT_DATA_AFTER"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec;
     }
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_CLOSE_EXIT"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec << ", status=" << 
result_status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
     return result_status;
 }
 
@@ -66,7 +167,22 @@ TMCCommitData VMCPartitionWriter::_build_mc_commit_data() {
 
     // Get statistics from Java side via JNI getStatistics()
     if (_jni_format_transformer) {
+        int64_t start_ms = MonotonicMillis();
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_GET_STATS_BEFORE"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec;
         auto statistics = _jni_format_transformer->get_statistics();
+        LOG(INFO) << "MC_DIAG stage=BE_PARTITION_GET_STATS_AFTER"
+                  << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+                  << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+                  << ", partition_spec=" << _partition_spec
+                  << ", stats_size=" << statistics.size()
+                  << ", cost_ms=" << MonotonicMillis() - start_ms;
         auto it = statistics.find("mc_commit_message");
         if (it != statistics.end() && !it->second.empty()) {
             commit_data.__set_commit_message(it->second);
@@ -76,6 +192,17 @@ TMCCommitData VMCPartitionWriter::_build_mc_commit_data() {
             commit_data.__set_written_bytes(std::stoll(it->second));
         }
     }
+    LOG(INFO) << "MC_DIAG stage=BE_PARTITION_BUILD_COMMIT_DATA_AFTER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << _partition_spec
+              << ", row_count=" << commit_data.row_count
+              << ", has_commit_message=" << commit_data.__isset.commit_message
+              << ", commit_message_length="
+              << (commit_data.__isset.commit_message ? 
commit_data.commit_message.size() : 0)
+              << ", has_written_bytes=" << commit_data.__isset.written_bytes;
 
     return commit_data;
 }
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp 
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
index f4a984dac05..c7805fb9bd4 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
@@ -24,11 +24,20 @@
 #include "format/transformer/vjni_format_transformer.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
+#include "util/time.h"
 #include "util/uid_util.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
 
+namespace {
+
+const char* mc_diag_bool(bool value) {
+    return value ? "true" : "false";
+}
+
+} // namespace
+
 VMCTableWriter::VMCTableWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_expr_ctxs,
                                std::shared_ptr<Dependency> dep, 
std::shared_ptr<Dependency> fin_dep)
         : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
@@ -43,11 +52,19 @@ Status VMCTableWriter::init_properties(ObjectPool* pool) {
 
 Status VMCTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
     _state = state;
+    int64_t start_ms = MonotonicMillis();
 
     LOG(INFO) << "VMCTableWriter::open"
               << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
               << ", per_fragment_instance_idx=" << 
state->per_fragment_instance_idx()
               << ", write_session_id=" << _mc_sink.write_session_id;
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_OPEN_ENTER"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", per_fragment_instance_idx=" << 
state->per_fragment_instance_idx()
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "");
 
     _written_rows_counter = ADD_COUNTER(_operator_profile, "WrittenRows", 
TUnit::UNIT);
     _send_data_timer = ADD_TIMER(_operator_profile, "SendDataTime");
@@ -95,6 +112,18 @@ Status VMCTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
         }
     }
 
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_OPEN_EXIT"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", per_fragment_instance_idx=" << 
state->per_fragment_instance_idx()
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", partition_columns=" << _partition_column_names.size()
+              << ", has_static_partition=" << 
mc_diag_bool(_has_static_partition)
+              << ", static_partition_spec=" << _static_partition_spec
+              << ", write_exprs=" << _write_output_vexpr_ctxs.size()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
     return Status::OK();
 }
 
@@ -124,6 +153,15 @@ std::map<std::string, std::string> 
VMCTableWriter::_build_base_writer_params() {
     params["fe_port"] = std::to_string(master_fe_addr.port);
     params["fe_rpc_timeout_ms"] = 
std::to_string(config::thrift_rpc_timeout_ms);
     params["fe_thrift_server_type"] = config::thrift_server_type_of_fe;
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_BUILD_WRITER_PARAMS"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", fe=" << master_fe_addr.hostname << ":" << 
master_fe_addr.port
+              << ", read_timeout=" << (_mc_sink.__isset.read_timeout ? 
_mc_sink.read_timeout : 0)
+              << ", retry_count=" << (_mc_sink.__isset.retry_count ? 
_mc_sink.retry_count : 0);
     return params;
 }
 
@@ -134,6 +172,13 @@ std::shared_ptr<VMCPartitionWriter> 
VMCTableWriter::_create_partition_writer(
     LOG(INFO) << "VMCTableWriter::_create_partition_writer"
               << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
               << ", partition_spec=" << partition_spec;
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_CREATE_PARTITION_WRITER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", partition_spec=" << partition_spec;
     return std::make_shared<VMCPartitionWriter>(_state, 
_write_output_vexpr_ctxs, partition_spec,
                                                 std::move(params));
 }
@@ -144,10 +189,41 @@ Status VMCTableWriter::write(RuntimeState* state, Block& 
block) {
         return Status::OK();
     }
 
+    int64_t start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_WRITE_ENTER"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", input_rows=" << block.rows() << ", input_columns=" << 
block.columns()
+              << ", has_static_partition=" << 
mc_diag_bool(_has_static_partition)
+              << ", partition_columns=" << _partition_column_names.size();
     Block output_block;
-    
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs,
 block,
-                                                                       
&output_block, false));
+    int64_t expr_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_EXPR_BEFORE"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", input_rows=" << block.rows() << ", input_columns=" << 
block.columns();
+    Status status = 
VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block,
+                                                                       
&output_block, false);
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_EXPR_AFTER"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", status=" << status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - expr_start_ms;
+    RETURN_IF_ERROR(status);
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_MATERIALIZE_BEFORE"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", output_rows=" << output_block.rows()
+              << ", output_columns=" << output_block.columns();
     materialize_block_inplace(output_block);
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_MATERIALIZE_AFTER"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", output_rows=" << output_block.rows()
+              << ", output_columns=" << output_block.columns();
 
     _row_count += output_block.rows();
 
@@ -156,12 +232,46 @@ Status VMCTableWriter::write(RuntimeState* state, Block& 
block) {
         auto it = _partitions_to_writers.find(_static_partition_spec);
         if (it == _partitions_to_writers.end()) {
             auto writer = _create_partition_writer(_static_partition_spec);
-            RETURN_IF_ERROR(writer->open());
+            LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_BEFORE"
+                      << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+                      << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                      << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                      << ", write_session_id="
+                      << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                      << ", partition_spec=" << _static_partition_spec;
+            status = writer->open();
+            LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_AFTER"
+                      << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+                      << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                      << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                      << ", write_session_id="
+                      << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                      << ", partition_spec=" << _static_partition_spec
+                      << ", status=" << status.to_string();
+            RETURN_IF_ERROR(status);
             _partitions_to_writers.insert({_static_partition_spec, writer});
             it = _partitions_to_writers.find(_static_partition_spec);
         }
         output_block.erase(_non_write_columns_indices);
-        return it->second->write(output_block);
+        LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_BEFORE"
+                  << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+                  << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                  << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                  << ", write_session_id="
+                  << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                  << ", partition_spec=" << _static_partition_spec
+                  << ", rows=" << output_block.rows() << ", columns=" << 
output_block.columns();
+        status = it->second->write(output_block);
+        LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_AFTER"
+                  << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+                  << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                  << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                  << ", write_session_id="
+                  << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                  << ", partition_spec=" << _static_partition_spec
+                  << ", rows=" << output_block.rows() << ", status=" << 
status.to_string()
+                  << ", cost_ms=" << MonotonicMillis() - start_ms;
+        return status;
     }
 
     // Case 2: Dynamic partition or non-partitioned table
@@ -172,20 +282,81 @@ Status VMCTableWriter::write(RuntimeState* state, Block& 
block) {
     auto it = _partitions_to_writers.find(partition_key);
     if (it == _partitions_to_writers.end()) {
         auto writer = _create_partition_writer("");
-        RETURN_IF_ERROR(writer->open());
+        LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_BEFORE"
+                  << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+                  << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                  << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                  << ", write_session_id="
+                  << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                  << ", partition_spec=" << partition_key
+                  << ", dynamic_partition=" << 
mc_diag_bool(!_partition_column_names.empty());
+        status = writer->open();
+        LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_AFTER"
+                  << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+                  << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                  << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                  << ", write_session_id="
+                  << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                  << ", partition_spec=" << partition_key << ", status=" << 
status.to_string();
+        RETURN_IF_ERROR(status);
         _partitions_to_writers.insert({partition_key, writer});
         it = _partitions_to_writers.find(partition_key);
     }
-    return it->second->write(output_block);
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_BEFORE"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", partition_spec=" << partition_key
+              << ", dynamic_partition=" << 
mc_diag_bool(!_partition_column_names.empty())
+              << ", rows=" << output_block.rows() << ", columns=" << 
output_block.columns();
+    status = it->second->write(output_block);
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_AFTER"
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", partition_spec=" << partition_key << ", rows=" << 
output_block.rows()
+              << ", status=" << status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    return status;
 }
 
 Status VMCTableWriter::close(Status status) {
+    int64_t start_ms = MonotonicMillis();
     Status result_status;
     int64_t partitions_count = _partitions_to_writers.size();
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_ENTER"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", input_status=" << status.to_string()
+              << ", partitions_count=" << partitions_count
+              << ", row_count=" << _row_count;
     {
         SCOPED_RAW_TIMER(&_close_ns);
         for (const auto& [partition_spec, writer] : _partitions_to_writers) {
+            LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_PARTITION_BEFORE"
+                      << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                      << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                      << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                      << ", write_session_id="
+                      << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                      << ", partition_spec=" << partition_spec;
+            int64_t partition_close_start_ms = MonotonicMillis();
             Status st = writer->close(status);
+            LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_PARTITION_AFTER"
+                      << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+                      << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+                      << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+                      << ", write_session_id="
+                      << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+                      << ", partition_spec=" << partition_spec << ", status=" 
<< st.to_string()
+                      << ", cost_ms=" << MonotonicMillis() - 
partition_close_start_ms;
             if (!st.ok()) {
                 LOG(WARNING) << "VMCPartitionWriter close failed for partition 
" << partition_spec
                              << ": " << st.to_string();
@@ -202,6 +373,16 @@ Status VMCTableWriter::close(Status status) {
         COUNTER_SET(_close_timer, _close_ns);
         COUNTER_SET(_partition_writers_count, partitions_count);
     }
+    LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_EXIT"
+              << ", fragment_instance_id=" << 
print_id(_state->fragment_instance_id())
+              << ", table=" << (_mc_sink.__isset.table_name ? 
_mc_sink.table_name : "")
+              << ", txn_id=" << (_mc_sink.__isset.txn_id ? 
std::to_string(_mc_sink.txn_id) : "")
+              << ", write_session_id="
+              << (_mc_sink.__isset.write_session_id ? 
_mc_sink.write_session_id : "")
+              << ", result_status=" << result_status.to_string()
+              << ", partitions_count=" << partitions_count
+              << ", row_count=" << _row_count
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
     return result_status;
 }
 
diff --git a/be/src/format/transformer/vjni_format_transformer.cpp 
b/be/src/format/transformer/vjni_format_transformer.cpp
index 782818f05aa..ac9a9bdb33f 100644
--- a/be/src/format/transformer/vjni_format_transformer.cpp
+++ b/be/src/format/transformer/vjni_format_transformer.cpp
@@ -19,9 +19,19 @@
 
 #include "exec/connector/jni_connector.h"
 #include "runtime/runtime_state.h"
+#include "util/time.h"
 
 namespace doris {
 
+namespace {
+
+std::string mc_diag_param(const std::map<std::string, std::string>& params, 
const std::string& key) {
+    auto it = params.find(key);
+    return it == params.end() ? "" : it->second;
+}
+
+} // namespace
+
 VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state,
                                              const VExprContextSPtrs& 
output_vexpr_ctxs,
                                              std::string writer_class,
@@ -32,45 +42,118 @@ VJniFormatTransformer::VJniFormatTransformer(RuntimeState* 
state,
 
 Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) {
     // Load writer class via the same class loader as JniScanner
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_INIT_WRITER_ENTER"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << mc_diag_param(_writer_params, 
"partition_spec")
+              << ", batch_size=" << batch_size;
     Jni::GlobalClass jni_writer_cls;
-    RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, 
_writer_class.c_str(), &jni_writer_cls));
+    int64_t start_ms = MonotonicMillis();
+    Status status = Jni::Util::get_jni_scanner_class(env, 
_writer_class.c_str(), &jni_writer_cls);
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_CLASS_AFTER"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    RETURN_IF_ERROR(status);
 
     // Get constructor: (int batchSize, Map<String,String> params)
     Jni::MethodId writer_constructor;
-    RETURN_IF_ERROR(
-            jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", 
&writer_constructor));
+    start_ms = MonotonicMillis();
+    status = jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", 
&writer_constructor);
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_CONSTRUCTOR_AFTER"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    RETURN_IF_ERROR(status);
 
     // Convert C++ params map to Java HashMap
     Jni::LocalObject hashmap_object;
-    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params, 
&hashmap_object));
+    start_ms = MonotonicMillis();
+    status = Jni::Util::convert_to_java_map(env, _writer_params, 
&hashmap_object);
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_CONVERT_PARAMS_AFTER"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    RETURN_IF_ERROR(status);
 
     // Create writer instance
-    RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor)
-                            .with_arg((jint)batch_size)
-                            .with_arg(hashmap_object)
-                            .call(&_jni_writer_obj));
+    start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_NEW_WRITER_BEFORE"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id");
+    status = jni_writer_cls.new_object(env, writer_constructor)
+                     .with_arg((jint)batch_size)
+                     .with_arg(hashmap_object)
+                     .call(&_jni_writer_obj);
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_NEW_WRITER_AFTER"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    RETURN_IF_ERROR(status);
 
     // Resolve method IDs
-    RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V", 
&_jni_writer_open));
-    RETURN_IF_ERROR(
-            jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", 
&_jni_writer_write));
-    RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V", 
&_jni_writer_close));
-    RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics", 
"()Ljava/util/Map;",
-                                              &_jni_writer_get_statistics));
+    status = jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open);
+    RETURN_IF_ERROR(status);
+    status = jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", 
&_jni_writer_write);
+    RETURN_IF_ERROR(status);
+    status = jni_writer_cls.get_method(env, "close", "()V", 
&_jni_writer_close);
+    RETURN_IF_ERROR(status);
+    status = jni_writer_cls.get_method(env, "getStatistics", 
"()Ljava/util/Map;",
+                                       &_jni_writer_get_statistics);
+    RETURN_IF_ERROR(status);
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_INIT_WRITER_EXIT"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id");
     return Status::OK();
 }
 
 Status VJniFormatTransformer::open() {
+    int64_t open_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_ENTER"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", partition_spec=" << mc_diag_param(_writer_params, 
"partition_spec");
     JNIEnv* env = nullptr;
-    RETURN_IF_ERROR(Jni::Env::Get(&env));
+    Status status = Jni::Env::Get(&env);
+    if (!status.ok()) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_GET_ENV_AFTER"
+                  << ", status=" << status.to_string();
+        return status;
+    }
 
     int batch_size = _state->batch_size();
-    RETURN_IF_ERROR(_init_jni_writer(env, batch_size));
+    status = _init_jni_writer(env, batch_size);
+    if (!status.ok()) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_INIT_AFTER"
+                  << ", status=" << status.to_string()
+                  << ", cost_ms=" << MonotonicMillis() - open_start_ms;
+        return status;
+    }
 
-    RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, 
_jni_writer_open).call());
+    int64_t java_open_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_OPEN_BEFORE"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id");
+    status = _jni_writer_obj.call_void_method(env, _jni_writer_open).call();
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_OPEN_CALL_RETURNED"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - java_open_start_ms;
+    RETURN_IF_ERROR(status);
     RETURN_ERROR_IF_EXC(env);
 
     _opened = true;
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_EXIT"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", cost_ms=" << MonotonicMillis() - open_start_ms;
     return Status::OK();
 }
 
@@ -79,20 +162,50 @@ Status VJniFormatTransformer::write(const Block& block) {
         return Status::OK();
     }
 
+    int64_t write_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_ENTER"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", rows=" << block.rows() << ", columns=" << block.columns();
     JNIEnv* env = nullptr;
-    RETURN_IF_ERROR(Jni::Env::Get(&env));
+    Status status = Jni::Env::Get(&env);
+    if (!status.ok()) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_GET_ENV_AFTER"
+                  << ", status=" << status.to_string();
+        return status;
+    }
 
     // 1. Convert Block to Java table metadata (column addresses)
     Block* mutable_block = const_cast<Block*>(&block);
     std::unique_ptr<long[]> input_table;
-    RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table));
+    int64_t convert_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_TO_JAVA_TABLE_BEFORE"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", rows=" << block.rows() << ", columns=" << block.columns();
+    status = JniConnector::to_java_table(mutable_block, input_table);
+    LOG(INFO) << "MC_DIAG stage=BE_TO_JAVA_TABLE_AFTER"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - convert_start_ms;
+    RETURN_IF_ERROR(status);
 
     // 2. Cache schema on first call
     if (!_schema_cached) {
+        int64_t schema_start_ms = MonotonicMillis();
+        LOG(INFO) << "MC_DIAG stage=BE_PARSE_SCHEMA_BEFORE"
+                  << ", writer_class=" << _writer_class
+                  << ", table=" << mc_diag_param(_writer_params, "table");
         auto schema = JniConnector::parse_table_schema(mutable_block);
         _cached_required_fields = schema.first;
         _cached_columns_types = schema.second;
         _schema_cached = true;
+        LOG(INFO) << "MC_DIAG stage=BE_PARSE_SCHEMA_AFTER"
+                  << ", writer_class=" << _writer_class
+                  << ", required_fields_length=" << 
_cached_required_fields.size()
+                  << ", columns_types_length=" << _cached_columns_types.size()
+                  << ", cost_ms=" << MonotonicMillis() - schema_start_ms;
     }
 
     // 3. Build input params map for Java writer
@@ -103,28 +216,83 @@ Status VJniFormatTransformer::write(const Block& block) {
 
     // 4. Convert to Java Map and call writer.write(inputParams)
     Jni::LocalObject input_map;
-    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, 
&input_map));
+    int64_t map_start_ms = MonotonicMillis();
+    status = Jni::Util::convert_to_java_map(env, input_params, &input_map);
+    LOG(INFO) << "MC_DIAG stage=BE_CONVERT_INPUT_MAP_AFTER"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - map_start_ms;
+    RETURN_IF_ERROR(status);
 
-    RETURN_IF_ERROR(
-            _jni_writer_obj.call_void_method(env, 
_jni_writer_write).with_arg(input_map).call());
+    int64_t java_write_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_WRITE_BEFORE"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", rows=" << block.rows() << ", columns=" << block.columns();
+    status = _jni_writer_obj.call_void_method(env, 
_jni_writer_write).with_arg(input_map).call();
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_WRITE_CALL_RETURNED"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - java_write_start_ms;
+    RETURN_IF_ERROR(status);
     RETURN_ERROR_IF_EXC(env);
 
     _cur_written_rows += block.rows();
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_EXIT"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", rows=" << block.rows()
+              << ", cur_written_rows=" << _cur_written_rows
+              << ", cost_ms=" << MonotonicMillis() - write_start_ms;
     return Status::OK();
 }
 
 Status VJniFormatTransformer::close() {
     if (_closed || !_opened) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_SKIP"
+                  << ", writer_class=" << _writer_class
+                  << ", table=" << mc_diag_param(_writer_params, "table")
+                  << ", opened=" << _opened << ", closed=" << _closed;
         return Status::OK();
     }
     _closed = true;
 
+    int64_t close_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_ENTER"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", cur_written_rows=" << _cur_written_rows;
     JNIEnv* env = nullptr;
-    RETURN_IF_ERROR(Jni::Env::Get(&env));
+    Status status = Jni::Env::Get(&env);
+    if (!status.ok()) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_GET_ENV_AFTER"
+                  << ", status=" << status.to_string();
+        return status;
+    }
 
-    RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, 
_jni_writer_close).call());
+    int64_t java_close_start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_CLOSE_BEFORE"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id");
+    status = _jni_writer_obj.call_void_method(env, _jni_writer_close).call();
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_CLOSE_CALL_RETURNED"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - java_close_start_ms;
+    RETURN_IF_ERROR(status);
     RETURN_ERROR_IF_EXC(env);
 
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_EXIT"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id")
+              << ", cost_ms=" << MonotonicMillis() - close_start_ms;
     return Status::OK();
 }
 
@@ -136,26 +304,57 @@ int64_t VJniFormatTransformer::written_len() {
 std::map<std::string, std::string> VJniFormatTransformer::get_statistics() {
     std::map<std::string, std::string> result;
     if (!_opened) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_SKIP_NOT_OPENED"
+                  << ", writer_class=" << _writer_class
+                  << ", table=" << mc_diag_param(_writer_params, "table");
         return result;
     }
 
+    int64_t start_ms = MonotonicMillis();
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_ENTER"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+              << ", write_session_id=" << mc_diag_param(_writer_params, 
"write_session_id");
     JNIEnv* env = nullptr;
-    if (!Jni::Env::Get(&env).ok()) {
+    Status status = Jni::Env::Get(&env);
+    if (!status.ok()) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_GET_ENV_AFTER"
+                  << ", status=" << status.to_string();
         return result;
     }
 
     Jni::LocalObject stats_map;
-    if (!_jni_writer_obj.call_object_method(env, _jni_writer_get_statistics)
-                 .call(&stats_map)
-                 .ok()) {
+    status = _jni_writer_obj.call_object_method(env, 
_jni_writer_get_statistics).call(&stats_map);
+    LOG(INFO) << "MC_DIAG stage=BE_JAVA_GET_STATS_CALL_RETURNED"
+              << ", writer_class=" << _writer_class << ", status=" << 
status.to_string()
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
+    if (!status.ok()) {
         return result;
     }
     if (stats_map.uninitialized()) {
+        LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_EMPTY"
+                  << ", writer_class=" << _writer_class
+                  << ", table=" << mc_diag_param(_writer_params, "table");
         return result;
     }
 
     // Convert Java Map<String,String> to C++ map
     static_cast<void>(Jni::Util::convert_to_cpp_map(env, stats_map, &result));
+    auto commit_it = result.find("mc_commit_message");
+    auto written_rows_it = result.find("counter:WrittenRows");
+    auto written_bytes_it = result.find("bytes:WrittenBytes");
+    LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_EXIT"
+              << ", writer_class=" << _writer_class
+              << ", table=" << mc_diag_param(_writer_params, "table")
+              << ", stats_size=" << result.size()
+              << ", commit_message_length="
+              << (commit_it == result.end() ? 0 : commit_it->second.size())
+              << ", written_rows="
+              << (written_rows_it == result.end() ? "" : 
written_rows_it->second)
+              << ", written_bytes="
+              << (written_bytes_it == result.end() ? "" : 
written_bytes_it->second)
+              << ", cost_ms=" << MonotonicMillis() - start_ms;
     return result;
 }
 
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
index 2dfefd2ac79..b94f7a0908f 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
@@ -20,6 +20,8 @@ package org.apache.doris.common.jni;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.VectorTable;
 
+import org.apache.log4j.Logger;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
@@ -32,6 +34,8 @@ import java.util.Map;
  * Lifecycle: open() -> write() [repeated] -> close()
  */
 public abstract class JniWriter {
+    private static final Logger LOG = Logger.getLogger(JniWriter.class);
+
     protected int batchSize;
     protected Map<String, String> params;
     protected ColumnType[] columnTypes;
@@ -51,10 +55,21 @@ public abstract class JniWriter {
      * then delegates to writeInternal.
      */
     public void write(Map<String, String> inputParams) throws IOException {
+        long writeEnterNs = System.nanoTime();
+        String requiredFields = inputParams.get("required_fields");
+        String columnsTypes = inputParams.get("columns_types");
+        LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_ENTER writer=" + 
getClass().getName()
+                + ", batchSize=" + batchSize
+                + ", schemaCached=" + (columnTypes != null)
+                + ", requiredFieldsLength=" + (requiredFields == null ? 0 : 
requiredFields.length())
+                + ", columnsTypesLength=" + (columnsTypes == null ? 0 : 
columnsTypes.length())
+                + ", thread=" + Thread.currentThread().getName());
+
         // Parse and cache schema on first call
         if (columnTypes == null) {
-            String requiredFields = inputParams.get("required_fields");
-            String columnsTypes = inputParams.get("columns_types");
+            long schemaStartNs = System.nanoTime();
+            LOG.info("MC_DIAG stage=JNI_WRITER_SCHEMA_PARSE_BEFORE writer=" + 
getClass().getName()
+                    + ", thread=" + Thread.currentThread().getName());
             if (requiredFields != null && !requiredFields.isEmpty()) {
                 fields = requiredFields.split(",");
                 String[] typeStrs = columnsTypes.split("#");
@@ -66,15 +81,37 @@ public abstract class JniWriter {
                 fields = new String[0];
                 columnTypes = new ColumnType[0];
             }
+            LOG.info("MC_DIAG stage=JNI_WRITER_SCHEMA_PARSE_AFTER writer=" + 
getClass().getName()
+                    + ", fields=" + fields.length
+                    + ", columnTypes=" + columnTypes.length
+                    + ", costMs=" + elapsedMs(schemaStartNs)
+                    + ", thread=" + Thread.currentThread().getName());
         }
 
         long startRead = System.nanoTime();
+        LOG.info("MC_DIAG stage=JNI_WRITER_CREATE_READABLE_TABLE_BEFORE 
writer=" + getClass().getName()
+                + ", thread=" + Thread.currentThread().getName());
         VectorTable inputTable = VectorTable.createReadableTable(inputParams);
         readTableTime += System.nanoTime() - startRead;
+        LOG.info("MC_DIAG stage=JNI_WRITER_CREATE_READABLE_TABLE_AFTER 
writer=" + getClass().getName()
+                + ", rows=" + inputTable.getNumRows()
+                + ", columns=" + inputTable.getNumColumns()
+                + ", costMs=" + elapsedMs(startRead)
+                + ", thread=" + Thread.currentThread().getName());
 
         long startWrite = System.nanoTime();
+        LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_INTERNAL_BEFORE writer=" + 
getClass().getName()
+                + ", rows=" + inputTable.getNumRows()
+                + ", columns=" + inputTable.getNumColumns()
+                + ", thread=" + Thread.currentThread().getName());
         writeInternal(inputTable);
         writeTime += System.nanoTime() - startWrite;
+        LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_INTERNAL_AFTER writer=" + 
getClass().getName()
+                + ", rows=" + inputTable.getNumRows()
+                + ", columns=" + inputTable.getNumColumns()
+                + ", writeCostMs=" + elapsedMs(startWrite)
+                + ", totalCostMs=" + elapsedMs(writeEnterNs)
+                + ", thread=" + Thread.currentThread().getName());
     }
 
     protected abstract void writeInternal(VectorTable inputTable) throws 
IOException;
@@ -96,4 +133,8 @@ public abstract class JniWriter {
     public long getReadTableTime() {
         return readTableTime;
     }
+
+    private static long elapsedMs(long startNs) {
+        return (System.nanoTime() - startNs) / 1_000_000L;
+    }
 }
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
index 82b58f48493..1c907b7c331 100644
--- 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
@@ -87,12 +87,22 @@ class MaxComputeFeClient implements AutoCloseable {
             throw new IOException("empty MaxCompute write_session_id for 
block_id allocation");
         }
 
+        long startNs = System.nanoTime();
+        LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_BEFORE txnId=" + txnId
+                + ", writeSessionId=" + writeSessionId
+                + ", masterFe=" + formatAddress(masterAddress)
+                + ", rpcTimeoutMs=" + rpcTimeoutMs);
         TMaxComputeBlockIdRequest request = buildBlockIdRequest(txnId, 
writeSessionId);
-        return callWithMasterRedirect(
+        long blockId = callWithMasterRedirect(
                 "allocate MaxCompute block_id",
                 client -> client.getMaxComputeBlockIdRange(request),
                 (result, requestAddress, retryTimes) ->
                         handleBlockIdResult(result, requestAddress, 
retryTimes, txnId, writeSessionId));
+        LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_AFTER txnId=" + txnId
+                + ", writeSessionId=" + writeSessionId
+                + ", blockId=" + blockId
+                + ", costMs=" + elapsedMs(startNs));
+        return blockId;
     }
 
     @Override
@@ -109,16 +119,30 @@ class MaxComputeFeClient implements AutoCloseable {
         for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES; 
retryTimes++) {
             TNetworkAddress requestAddress = copyAddress(masterAddress);
             T result;
+            long rpcStartNs = System.nanoTime();
+            LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_RPC_BEFORE operation=" + 
operation
+                    + ", retry=" + retryTimes
+                    + ", fe=" + formatAddress(requestAddress)
+                    + ", rpcTimeoutMs=" + rpcTimeoutMs
+                    + ", framedTransport=" + useFramedTransport());
             try {
                 result = rpcExecutor.call(requestAddress, rpcTimeoutMs, 
useFramedTransport(), call);
             } catch (Exception e) {
                 lastException = e;
                 rpcExecutor.close();
+                LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_RPC_ERROR operation=" + 
operation
+                        + ", retry=" + retryTimes
+                        + ", fe=" + formatAddress(requestAddress)
+                        + ", costMs=" + elapsedMs(rpcStartNs), e);
                 LOG.warn("Failed to " + operation + ", rpc failure, 
retry_time="
                         + retryTimes + ", fe=" + 
formatAddress(requestAddress), e);
                 sleepBeforeRetry();
                 continue;
             }
+            LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_RPC_AFTER operation=" + 
operation
+                    + ", retry=" + retryTimes
+                    + ", fe=" + formatAddress(requestAddress)
+                    + ", costMs=" + elapsedMs(rpcStartNs));
 
             try {
                 return handler.handle(result, requestAddress, retryTimes);
@@ -155,6 +179,11 @@ class MaxComputeFeClient implements AutoCloseable {
                     + formatAddress(requestAddress) + ", switch to FE@" + 
formatAddress(result.getMasterAddress())
                     + ", retry_time=" + retryTimes + ", txn_id=" + txnId
                     + ", write_session_id=" + writeSessionId);
+            LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_NOT_MASTER txnId=" 
+ txnId
+                    + ", writeSessionId=" + writeSessionId
+                    + ", requestFe=" + formatAddress(requestAddress)
+                    + ", masterFe=" + formatAddress(result.getMasterAddress())
+                    + ", retry=" + retryTimes);
             throw new NotMasterException(result.getMasterAddress());
         }
 
@@ -173,11 +202,11 @@ class MaxComputeFeClient implements AutoCloseable {
                     + result.getLength() + ", txn_id=" + txnId + ", 
write_session_id=" + writeSessionId);
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Allocated MaxCompute block_id from FE@" + 
formatAddress(requestAddress)
-                    + ", txn_id=" + txnId + ", write_session_id=" + 
writeSessionId
-                    + ", block_id=" + result.getStart());
-        }
+        LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_RESULT txnId=" + txnId
+                + ", writeSessionId=" + writeSessionId
+                + ", fe=" + formatAddress(requestAddress)
+                + ", retry=" + retryTimes
+                + ", blockId=" + result.getStart());
         return result.getStart();
     }
 
@@ -248,6 +277,10 @@ class MaxComputeFeClient implements AutoCloseable {
         return address.getHostname() + ":" + address.getPort();
     }
 
+    private static long elapsedMs(long startNs) {
+        return (System.nanoTime() - startNs) / 1_000_000L;
+    }
+
     interface RpcExecutor {
         <T> T call(TNetworkAddress address, int timeoutMs, boolean 
useFramedTransport,
                 FeCall<T> call) throws Exception;
@@ -273,10 +306,22 @@ class MaxComputeFeClient implements AutoCloseable {
         @Override
         public synchronized <T> T call(TNetworkAddress address, int timeoutMs, 
boolean useFramedTransport,
                 FeCall<T> call) throws Exception {
+            long callStartNs = System.nanoTime();
+            LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_EXECUTOR_CALL_ENTER fe=" + 
formatAddress(address)
+                    + ", timeoutMs=" + timeoutMs
+                    + ", framedTransport=" + useFramedTransport);
             ensureConnected(address, timeoutMs, useFramedTransport);
             try {
-                return call.call(client);
+                long thriftCallStartNs = System.nanoTime();
+                LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_THRIFT_CALL_BEFORE fe=" 
+ formatAddress(address));
+                T result = call.call(client);
+                LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_THRIFT_CALL_AFTER fe=" 
+ formatAddress(address)
+                        + ", thriftCallCostMs=" + elapsedMs(thriftCallStartNs)
+                        + ", totalCostMs=" + elapsedMs(callStartNs));
+                return result;
             } catch (Exception e) {
+                LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_EXECUTOR_CALL_ERROR 
fe=" + formatAddress(address)
+                        + ", costMs=" + elapsedMs(callStartNs), e);
                 close();
                 throw e;
             }
@@ -297,10 +342,15 @@ class MaxComputeFeClient implements AutoCloseable {
             if (client != null && transport != null && transport.isOpen()
                     && connectedFramedTransport == useFramedTransport
                     && sameAddress(connectedAddress, address)) {
+                LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_REUSE_CONNECTION fe=" + 
formatAddress(address));
                 return;
             }
 
             close();
+            long connectStartNs = System.nanoTime();
+            LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_BEFORE fe=" + 
formatAddress(address)
+                    + ", timeoutMs=" + timeoutMs
+                    + ", framedTransport=" + useFramedTransport);
             TTransport newTransport = createTransport(address, timeoutMs, 
useFramedTransport);
             try {
                 newTransport.open();
@@ -308,7 +358,11 @@ class MaxComputeFeClient implements AutoCloseable {
                 client = new FrontendService.Client(new 
TBinaryProtocol(transport));
                 connectedAddress = copyAddress(address);
                 connectedFramedTransport = useFramedTransport;
+                LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_AFTER fe=" + 
formatAddress(address)
+                        + ", costMs=" + elapsedMs(connectStartNs));
             } catch (Exception e) {
+                LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_ERROR fe=" + 
formatAddress(address)
+                        + ", costMs=" + elapsedMs(connectStartNs), e);
                 newTransport.close();
                 throw e;
             }
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 9788184057e..334f7124f59 100644
--- 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -156,18 +156,37 @@ public class MaxComputeJniWriter extends JniWriter {
                 params.getOrDefault(MCProperties.WRITE_MAX_BLOCK_BYTES,
                         MCProperties.DEFAULT_WRITE_MAX_BLOCK_BYTES));
         this.feClient = MaxComputeFeClient.create(params);
+        logDiag("JAVA_WRITER_CONSTRUCTED",
+                "connectTimeout=" + connectTimeout
+                        + ", readTimeout=" + readTimeout
+                        + ", retryCount=" + retryCount
+                        + ", maxBlockBytes=" + maxBlockBytes
+                        + ", preallocatedBlockId=" + preallocatedBlockId);
     }
 
     @Override
     public void open() throws IOException {
+        logDiag("JAVA_OPEN_ENTER",
+                "connectTimeout=" + connectTimeout
+                        + ", readTimeout=" + readTimeout
+                        + ", retryCount=" + retryCount
+                        + ", maxBlockBytes=" + maxBlockBytes);
         try {
+            long stageStartNs = System.nanoTime();
+            logDiag("JAVA_CREATE_MC_CLIENT_BEFORE", "");
             Odps odps = MCUtils.createMcClient(params);
             odps.setDefaultProject(project);
             odps.setEndpoint(endpoint);
+            logDiag("JAVA_CREATE_MC_CLIENT_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
 
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_BUILD_CREDENTIALS_BEFORE", "");
             Credentials credentials = 
Credentials.newBuilder().withAccount(odps.getAccount())
                     .withAppAccount(odps.getAppAccount()).build();
+            logDiag("JAVA_BUILD_CREDENTIALS_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
 
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_BUILD_SETTINGS_BEFORE", "");
             RestOptions restOptions = RestOptions.newBuilder()
                     .withConnectTimeout(connectTimeout)
                     .withReadTimeout(readTimeout)
@@ -179,16 +198,22 @@ public class MaxComputeJniWriter extends JniWriter {
                     .withQuotaName(Strings.isNullOrEmpty(quota) ? null : quota)
                     .withRestOptions(restOptions)
                     .build();
+            logDiag("JAVA_BUILD_SETTINGS_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
 
             // Restore the write session created by FE
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_RESTORE_SESSION_BEFORE", "");
             writeSession = new TableWriteSessionBuilder()
                     
.identifier(com.aliyun.odps.table.TableIdentifier.of(project, tableName))
                     .withSessionId(writeSessionId)
                     .withSettings(settings)
                     .buildBatchWriteSession();
+            logDiag("JAVA_RESTORE_SESSION_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
 
             // SDK skips ArrowOptions when restoring session via withSessionId,
             // set it via reflection to avoid NPE in ArrowWriterImpl
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_SET_ARROW_OPTIONS_BEFORE", "");
             ArrowOptions arrowOptions = ArrowOptions.newBuilder()
                     .withDatetimeUnit(TimestampUnit.MILLI)
                     .withTimestampUnit(TimestampUnit.MILLI)
@@ -197,8 +222,11 @@ public class MaxComputeJniWriter extends JniWriter {
                     .getSuperclass().getDeclaredField("arrowOptions");
             arrowField.setAccessible(true);
             arrowField.set(writeSession, arrowOptions);
+            logDiag("JAVA_SET_ARROW_OPTIONS_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
 
             // Get schema info for type mapping
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_REQUIRED_SCHEMA_BEFORE", "");
             com.aliyun.odps.table.DataSchema dataSchema = 
writeSession.requiredSchema();
             columnTypeInfos = new java.util.ArrayList<>();
             columnNames = new java.util.ArrayList<>();
@@ -206,20 +234,30 @@ public class MaxComputeJniWriter extends JniWriter {
                 columnTypeInfos.add(col.getTypeInfo());
                 columnNames.add(col.getName());
             }
+            logDiag("JAVA_REQUIRED_SCHEMA_AFTER",
+                    "columns=" + columnNames.size() + ", costMs=" + 
elapsedMs(stageStartNs));
 
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_ALLOCATOR_CREATE_BEFORE", "");
             allocator = new RootAllocator(Long.MAX_VALUE);
+            logDiag("JAVA_ALLOCATOR_CREATE_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
 
+            stageStartNs = System.nanoTime();
+            logDiag("JAVA_WRITER_OPTIONS_BEFORE", "");
             writerOptions = WriterOptions.newBuilder()
                     .withSettings(settings)
                     .withCompressionCodec(CompressionCodec.ZSTD)
                     .build();
+            logDiag("JAVA_WRITER_OPTIONS_AFTER", "costMs=" + 
elapsedMs(stageStartNs));
             openBatchWriter(resolveInitialBlockId());
 
             LOG.info("MaxComputeJniWriter opened: project=" + project + ", 
table=" + tableName
                     + ", writeSessionId=" + writeSessionId + ", 
partitionSpec=" + partitionSpec
                     + ", blockId=" + currentBlockId);
+            logDiag("JAVA_OPEN_EXIT", "");
         } catch (Exception e) {
             String errorMsg = "Failed to open MaxCompute write session for 
table " + project + "." + tableName;
+            logDiag("JAVA_OPEN_ERROR", "error=" + e.getClass().getName() + ": 
" + e.getMessage());
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
         }
@@ -233,58 +271,112 @@ public class MaxComputeJniWriter extends JniWriter {
             return;
         }
 
+        long startNs = System.nanoTime();
+        logDiag("JAVA_WRITE_INTERNAL_ENTER", "rows=" + numRows + ", columns=" 
+ numCols);
         try {
             writeRowsWithRowChecks(inputTable, numRows, numCols);
+            logDiag("JAVA_WRITE_INTERNAL_EXIT",
+                    "rows=" + numRows + ", columns=" + numCols + ", costMs=" + 
elapsedMs(startNs));
         } catch (Exception e) {
             String errorMsg = "Failed to write data to MaxCompute table " + 
project + "." + tableName;
+            logDiag("JAVA_WRITE_INTERNAL_ERROR",
+                    "rows=" + numRows + ", columns=" + numCols
+                            + ", costMs=" + elapsedMs(startNs)
+                            + ", error=" + e.getClass().getName() + ": " + 
e.getMessage());
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
         }
     }
 
     private long resolveInitialBlockId() throws IOException {
-        return preallocatedBlockId != null ? preallocatedBlockId : 
requestBlockId();
+        if (preallocatedBlockId != null) {
+            logDiag("JAVA_RESOLVE_INITIAL_BLOCK_PREALLOCATED", "blockId=" + 
preallocatedBlockId);
+            return preallocatedBlockId;
+        }
+        return requestBlockId();
     }
 
     private long requestBlockId() throws IOException {
-        return feClient.requestBlockId(txnId, writeSessionId);
+        long startNs = System.nanoTime();
+        logDiag("JAVA_REQUEST_BLOCK_ID_BEFORE", "");
+        long blockId = feClient.requestBlockId(txnId, writeSessionId);
+        logDiag("JAVA_REQUEST_BLOCK_ID_AFTER",
+                "newBlockId=" + blockId + ", costMs=" + elapsedMs(startNs));
+        return blockId;
     }
 
     private void openBatchWriter(long blockId) throws IOException {
+        long startNs = System.nanoTime();
+        logDiag("JAVA_OPEN_BATCH_WRITER_BEFORE", "targetBlockId=" + blockId);
         currentBlockId = blockId;
         currentBlockWrittenBytes = 0L;
         batchWriter = writeSession.createArrowWriter(blockId, 
WriterAttemptId.of(0), writerOptions);
+        logDiag("JAVA_OPEN_BATCH_WRITER_AFTER",
+                "targetBlockId=" + blockId + ", costMs=" + elapsedMs(startNs));
     }
 
     private void closeCurrentBatchWriterAndCollectCommit() throws IOException {
         if (batchWriter == null) {
+            logDiag("JAVA_COMMIT_SKIP_NO_BATCH_WRITER", "");
             return;
         }
+        long startNs = System.nanoTime();
+        logDiag("JAVA_COMMIT_BEFORE",
+                "currentBlockWrittenBytes=" + currentBlockWrittenBytes
+                        + ", commitMessageCount=" + commitMessages.size());
         WriterCommitMessage commitMessage = batchWriter.commit();
         if (commitMessage != null) {
             commitMessages.add(commitMessage);
         }
         batchWriter = null;
+        logDiag("JAVA_COMMIT_AFTER",
+                "commitMessageAdded=" + (commitMessage != null)
+                        + ", commitMessageCount=" + commitMessages.size()
+                        + ", costMs=" + elapsedMs(startNs));
     }
 
     private void rotateCurrentBatchWriter() throws IOException {
+        long startNs = System.nanoTime();
+        logDiag("JAVA_ROTATE_BEFORE",
+                "currentBlockWrittenBytes=" + currentBlockWrittenBytes
+                        + ", commitMessageCount=" + commitMessages.size());
         closeCurrentBatchWriterAndCollectCommit();
         openBatchWriter(requestBlockId());
+        logDiag("JAVA_ROTATE_AFTER", "costMs=" + elapsedMs(startNs));
     }
 
     private void writeRowsWithRowChecks(VectorTable inputTable, int numRows, 
int numCols) throws IOException {
+        logDiag("JAVA_WRITE_ROWS_ENTER", "rows=" + numRows + ", columns=" + 
numCols);
         int rowStart = 0;
         while (rowStart < numRows) {
             int rowEnd = rowStart;
             long batchEstimatedBytes = 0L;
             boolean rotateAfterWrite = false;
+            int estimatedRows = 0;
+            long rangeStartNs = System.nanoTime();
+            logDiag("JAVA_RANGE_SELECT_BEFORE",
+                    "rowStart=" + rowStart
+                            + ", rows=" + numRows
+                            + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes);
             while (rowEnd < numRows) {
+                if (estimatedRows == 0 || estimatedRows % 1024 == 0) {
+                    logDiag("JAVA_RANGE_SELECT_PROGRESS",
+                            "rowStart=" + rowStart
+                                    + ", probingRow=" + rowEnd
+                                    + ", estimatedRows=" + estimatedRows
+                                    + ", batchEstimatedBytes=" + 
batchEstimatedBytes);
+                }
                 long rowEstimatedBytes = 
estimateSingleRowPayloadBytes(inputTable, numCols, rowEnd);
+                estimatedRows++;
                 boolean exceedsHardLimit = currentBlockWrittenBytes + 
batchEstimatedBytes
                         + rowEstimatedBytes > maxBlockBytes;
                 if (exceedsHardLimit) {
                     if (rowEnd == rowStart) {
                         if (currentBlockWrittenBytes > 0) {
+                            
logDiag("JAVA_RANGE_SELECT_ROTATE_FOR_OVERSIZE_ROW",
+                                    "rowStart=" + rowStart
+                                            + ", rowEstimatedBytes=" + 
rowEstimatedBytes
+                                            + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes);
                             rotateCurrentBatchWriter();
                             continue;
                         }
@@ -301,28 +393,75 @@ public class MaxComputeJniWriter extends JniWriter {
                     break;
                 }
             }
+            logDiag("JAVA_RANGE_SELECT_AFTER",
+                    "rowStart=" + rowStart
+                            + ", rowEnd=" + rowEnd
+                            + ", selectedRows=" + (rowEnd - rowStart)
+                            + ", estimatedRows=" + estimatedRows
+                            + ", batchEstimatedBytes=" + batchEstimatedBytes
+                            + ", rotateAfterWrite=" + rotateAfterWrite
+                            + ", costMs=" + elapsedMs(rangeStartNs));
 
             if (rowEnd == rowStart) {
+                long fallbackStartNs = System.nanoTime();
+                logDiag("JAVA_RANGE_SELECT_FALLBACK_BEFORE", "rowStart=" + 
rowStart);
                 long rowEstimatedBytes = 
estimateSingleRowPayloadBytes(inputTable, numCols, rowStart);
                 batchEstimatedBytes = rowEstimatedBytes;
                 rowEnd = rowStart + 1;
                 rotateAfterWrite = true;
+                logDiag("JAVA_RANGE_SELECT_FALLBACK_AFTER",
+                        "rowStart=" + rowStart
+                                + ", rowEstimatedBytes=" + rowEstimatedBytes
+                                + ", costMs=" + elapsedMs(fallbackStartNs));
             }
 
-            try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, 
numCols, rowStart, rowEnd)) {
+            long buildStartNs = System.nanoTime();
+            logDiag("JAVA_BUILD_ROOT_BEFORE",
+                    "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+                            + ", selectedRows=" + (rowEnd - rowStart));
+            try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, 
numCols, rowStart, rowEnd, true)) {
+                logDiag("JAVA_BUILD_ROOT_AFTER",
+                        "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+                                + ", selectedRows=" + (rowEnd - rowStart)
+                                + ", costMs=" + elapsedMs(buildStartNs));
+                long writeStartNs = System.nanoTime();
+                logDiag("JAVA_BATCH_WRITE_BEFORE",
+                        "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+                                + ", selectedRows=" + (rowEnd - rowStart)
+                                + ", batchEstimatedBytes=" + 
batchEstimatedBytes);
                 batchWriter.write(root);
+                logDiag("JAVA_BATCH_WRITE_AFTER",
+                        "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+                                + ", selectedRows=" + (rowEnd - rowStart)
+                                + ", costMs=" + elapsedMs(writeStartNs));
             }
+            long flushStartNs = System.nanoTime();
+            logDiag("JAVA_FLUSH_BEFORE",
+                    "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+                            + ", selectedRows=" + (rowEnd - rowStart));
             batchWriter.flush();
+            logDiag("JAVA_FLUSH_AFTER",
+                    "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+                            + ", selectedRows=" + (rowEnd - rowStart)
+                            + ", costMs=" + elapsedMs(flushStartNs));
             int rowsWrittenNow = rowEnd - rowStart;
             writtenRows += rowsWrittenNow;
             currentBlockWrittenBytes += batchEstimatedBytes;
             writtenBytes += batchEstimatedBytes;
+            logDiag("JAVA_BATCH_DONE",
+                    "rowsWrittenNow=" + rowsWrittenNow
+                            + ", nextRowStart=" + rowEnd
+                            + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes
+                            + ", writtenRows=" + writtenRows
+                            + ", writtenBytes=" + writtenBytes
+                            + ", rotateAfterWrite=" + rotateAfterWrite);
             rowStart = rowEnd;
 
             if (rotateAfterWrite && rowStart < numRows) {
                 rotateCurrentBatchWriter();
             }
         }
+        logDiag("JAVA_WRITE_ROWS_EXIT", "rows=" + numRows + ", columns=" + 
numCols);
     }
 
     private static class CountingDiscardOutputStream extends OutputStream {
@@ -339,7 +478,7 @@ public class MaxComputeJniWriter extends JniWriter {
 
     private long estimateSingleRowPayloadBytes(VectorTable inputTable, int 
numCols, int rowIndex)
             throws IOException {
-        try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, 
rowIndex, rowIndex + 1);
+        try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, 
rowIndex, rowIndex + 1, false);
                 ArrowWriter estimator = 
ArrowWriterFactory.getRecordBatchWriter(
                         new CountingDiscardOutputStream(), writerOptions)) {
             estimator.writeBatch(root);
@@ -347,13 +486,32 @@ public class MaxComputeJniWriter extends JniWriter {
         }
     }
 
-    private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int 
numCols, int rowStart, int rowEnd) {
+    private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int 
numCols, int rowStart, int rowEnd,
+                                              boolean logColumns) {
         int rowCount = rowEnd - rowStart;
         VectorSchemaRoot root = batchWriter.newElement();
         root.setRowCount(rowCount);
         for (int col = 0; col < numCols && col < columnTypeInfos.size(); 
col++) {
+            long columnStartNs = System.nanoTime();
+            if (logColumns) {
+                logDiag("JAVA_BUILD_COLUMN_BEFORE",
+                        "columnIndex=" + col
+                                + ", columnName=" + columnNames.get(col)
+                                + ", odpsType=" + 
columnTypeInfos.get(col).getOdpsType()
+                                + ", rowStart=" + rowStart
+                                + ", rowCount=" + rowCount);
+            }
             fillArrowVectorStreaming(root, col, 
columnTypeInfos.get(col).getOdpsType(),
                     inputTable.getColumn(col), rowStart, rowCount);
+            if (logColumns) {
+                logDiag("JAVA_BUILD_COLUMN_AFTER",
+                        "columnIndex=" + col
+                                + ", columnName=" + columnNames.get(col)
+                                + ", odpsType=" + 
columnTypeInfos.get(col).getOdpsType()
+                                + ", rowStart=" + rowStart
+                                + ", rowCount=" + rowCount
+                                + ", costMs=" + elapsedMs(columnStartNs));
+            }
         }
         return root;
     }
@@ -904,28 +1062,42 @@ public class MaxComputeJniWriter extends JniWriter {
 
     @Override
     public void close() throws IOException {
+        long closeStartNs = System.nanoTime();
+        logDiag("JAVA_CLOSE_ENTER", "");
         try {
             closeCurrentBatchWriterAndCollectCommit();
             if (allocator != null) {
+                long allocatorStartNs = System.nanoTime();
+                logDiag("JAVA_ALLOCATOR_CLOSE_BEFORE", "");
                 allocator.close();
                 allocator = null;
+                logDiag("JAVA_ALLOCATOR_CLOSE_AFTER", "costMs=" + 
elapsedMs(allocatorStartNs));
             }
             LOG.info("MaxComputeJniWriter closed: writeSessionId=" + 
writeSessionId
                     + ", partitionSpec=" + partitionSpec
                     + ", writtenRows=" + writtenRows
                     + ", lastBlockId=" + currentBlockId
                     + ", commitMessageCount=" + commitMessages.size());
+            logDiag("JAVA_CLOSE_EXIT", "costMs=" + elapsedMs(closeStartNs));
         } catch (Exception e) {
             String errorMsg = "Failed to close MaxCompute arrow writer";
+            logDiag("JAVA_CLOSE_ERROR",
+                    "costMs=" + elapsedMs(closeStartNs)
+                            + ", error=" + e.getClass().getName() + ": " + 
e.getMessage());
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
         } finally {
+            long feClientCloseStartNs = System.nanoTime();
+            logDiag("JAVA_FE_CLIENT_CLOSE_BEFORE", "");
             feClient.close();
+            logDiag("JAVA_FE_CLIENT_CLOSE_AFTER", "costMs=" + 
elapsedMs(feClientCloseStartNs));
         }
     }
 
     @Override
     public Map<String, String> getStatistics() {
+        long startNs = System.nanoTime();
+        logDiag("JAVA_GET_STATISTICS_ENTER", "commitMessageCount=" + 
commitMessages.size());
         Map<String, String> stats = new HashMap<>();
         stats.put("mc_partition_spec", partitionSpec != null ? partitionSpec : 
"");
 
@@ -936,8 +1108,14 @@ public class MaxComputeJniWriter extends JniWriter {
                 ObjectOutputStream oos = new ObjectOutputStream(baos);
                 oos.writeObject(commitMessages);
                 oos.close();
-                stats.put("mc_commit_message", 
Base64.getEncoder().encodeToString(baos.toByteArray()));
+                String encoded = 
Base64.getEncoder().encodeToString(baos.toByteArray());
+                stats.put("mc_commit_message", encoded);
+                logDiag("JAVA_GET_STATISTICS_COMMIT_MESSAGE_SERIALIZED",
+                        "commitMessageCount=" + commitMessages.size()
+                                + ", encodedLength=" + encoded.length());
             } catch (IOException e) {
+                logDiag("JAVA_GET_STATISTICS_ERROR",
+                        "error=" + e.getClass().getName() + ": " + 
e.getMessage());
                 LOG.error("Failed to serialize WriterCommitMessages", e);
             }
         }
@@ -946,6 +1124,29 @@ public class MaxComputeJniWriter extends JniWriter {
         stats.put("bytes:WrittenBytes", String.valueOf(writtenBytes));
         stats.put("timer:WriteTime", String.valueOf(writeTime));
         stats.put("timer:ReadTableTime", String.valueOf(readTableTime));
+        logDiag("JAVA_GET_STATISTICS_EXIT",
+                "statsSize=" + stats.size()
+                        + ", costMs=" + elapsedMs(startNs));
         return stats;
     }
+
+    private void logDiag(String stage, String detail) {
+        String message = "MC_DIAG stage=" + stage
+                + ", table=" + tableName
+                + ", txnId=" + txnId
+                + ", writeSessionId=" + writeSessionId
+                + ", partitionSpec=" + partitionSpec
+                + ", blockId=" + currentBlockId
+                + ", writtenRows=" + writtenRows
+                + ", writtenBytes=" + writtenBytes
+                + ", thread=" + Thread.currentThread().getName();
+        if (detail != null && !detail.isEmpty()) {
+            message += ", " + detail;
+        }
+        LOG.info(message);
+    }
+
+    private static long elapsedMs(long startNs) {
+        return (System.nanoTime() - startNs) / 1_000_000L;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
index 9f1c61ddf24..fa9fcefab81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
@@ -64,8 +64,24 @@ public class MCTransaction implements Transaction {
     }
 
     public void updateMCCommitData(List<TMCCommitData> commitDataList) {
+        long incomingRows = 0;
+        int incomingCommitMessages = 0;
+        long incomingCommitMessageBytes = 0;
+        for (TMCCommitData data : commitDataList) {
+            incomingRows += data.getRowCount();
+            if (data.isSetCommitMessage() && 
!data.getCommitMessage().isEmpty()) {
+                incomingCommitMessages++;
+                incomingCommitMessageBytes += data.getCommitMessage().length();
+            }
+        }
         synchronized (this) {
+            LOG.info("MC_DIAG stage=FE_UPDATE_COMMIT_DATA table={} 
sessionId={} incomingDatas={} incomingRows={}"
+                            + " incomingCommitMessages={} 
incomingCommitMessageBytes={} existingDatas={}",
+                    table == null ? "null" : table.getName(), writeSessionId, 
commitDataList.size(), incomingRows,
+                    incomingCommitMessages, incomingCommitMessageBytes, 
this.commitDataList.size());
             this.commitDataList.addAll(commitDataList);
+            LOG.info("MC_DIAG stage=FE_UPDATE_COMMIT_DATA_DONE table={} 
sessionId={} totalDatas={}",
+                    table == null ? "null" : table.getName(), writeSessionId, 
this.commitDataList.size());
         }
     }
 
@@ -73,7 +89,15 @@ public class MCTransaction implements Transaction {
         this.table = (MaxComputeExternalTable) dorisTable;
 
         try {
+            long beginStartMs = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_START db={} table={}",
+                    table.getDbName(), table.getName());
+            long tableIdStartMs = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_GET_ODPS_TABLE_ID_BEFORE db={} 
table={}",
+                    table.getDbName(), table.getName());
             TableIdentifier tableId = 
catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());
+            LOG.info("MC_DIAG stage=FE_GET_ODPS_TABLE_ID_AFTER db={} table={} 
costMs={}",
+                    table.getDbName(), table.getName(), 
System.currentTimeMillis() - tableIdStartMs);
 
             boolean isDynamicPartition = 
!table.getPartitionColumns().isEmpty();
             boolean isStaticPartition = false;
@@ -94,6 +118,10 @@ public class MCTransaction implements Transaction {
                 }
                 isOverwrite = mcCtx.isOverwrite();
             }
+            LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_ENTER table={}.{} 
dynamicPartition={} staticPartition={}"
+                            + " staticPartitionSpec={} overwrite={} 
partitionColumns={}",
+                    catalog.getDefaultProject(), table.getName(), 
isDynamicPartition, isStaticPartition,
+                    staticPartitionSpecStr, isOverwrite, 
table.getPartitionColumns().size());
 
             TableWriteSessionBuilder builder = new TableWriteSessionBuilder()
                     .identifier(tableId)
@@ -114,13 +142,25 @@ public class MCTransaction implements Transaction {
                 builder.overwrite(true);
             }
 
+            long buildSessionStartMs = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_BUILD_WRITE_SESSION_BEFORE table={}.{} 
dynamicPartition={}"
+                            + " staticPartition={} overwrite={}",
+                    catalog.getDefaultProject(), table.getName(), 
isDynamicPartition, isStaticPartition, isOverwrite);
             TableBatchWriteSession writeSession = 
builder.buildBatchWriteSession();
             writeSessionId = writeSession.getId();
             nextBlockId.set(0);
+            LOG.info("MC_DIAG stage=FE_BUILD_WRITE_SESSION_AFTER table={}.{} 
sessionId={} costMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId,
+                    System.currentTimeMillis() - buildSessionStartMs);
 
             LOG.info("Created MC Storage API write session: {} for table 
{}.{}",
                     writeSessionId, catalog.getDefaultProject(), 
table.getName());
+            LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_EXIT table={}.{} 
sessionId={} costMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId,
+                    System.currentTimeMillis() - beginStartMs);
         } catch (Exception e) {
+            LOG.warn("MC_DIAG stage=FE_BEGIN_INSERT_ERROR table={} error={}: 
{}",
+                    dorisTable.getName(), e.getClass().getName(), 
e.getMessage(), e);
             throw new UserException("Failed to begin insert for MaxCompute 
table "
                     + dorisTable.getName() + ": " + e.getMessage(), e);
         }
@@ -131,6 +171,11 @@ public class MCTransaction implements Transaction {
     }
 
     public long allocateBlockIdRange(String requestWriteSessionId, long 
length) throws UserException {
+        long startMs = System.currentTimeMillis();
+        LOG.info("MC_DIAG stage=FE_ALLOCATE_BLOCK_ID_ENTER table={} 
sessionId={} requestSessionId={} length={}"
+                        + " nextBlockId={}",
+                table == null ? "null" : table.getName(), writeSessionId, 
requestWriteSessionId, length,
+                nextBlockId.get());
         if (length <= 0) {
             throw new UserException("MaxCompute block_id allocation length 
must be positive: " + length);
         }
@@ -156,29 +201,48 @@ public class MCTransaction implements Transaction {
 
         LOG.info("Allocated MaxCompute block_id range: sessionId={}, start={}, 
length={}",
                 writeSessionId, start, length);
+        LOG.info("MC_DIAG stage=FE_ALLOCATE_BLOCK_ID_EXIT table={} 
sessionId={} start={} length={} nextBlockId={}"
+                        + " costMs={}",
+                table == null ? "null" : table.getName(), writeSessionId, 
start, length, nextBlockId.get(),
+                System.currentTimeMillis() - startMs);
         return start;
     }
 
     private void appendCommitMessages(List<WriterCommitMessage> allMessages, 
String encodedCommitMessage)
             throws Exception {
+        long startMs = System.currentTimeMillis();
+        LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_BEFORE table={} 
sessionId={} encodedLength={}",
+                table == null ? "null" : table.getName(), writeSessionId, 
encodedCommitMessage.length());
         byte[] bytes = Base64.getDecoder().decode(encodedCommitMessage);
         ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
         ObjectInputStream ois = new ObjectInputStream(bais);
         Object payload = ois.readObject();
         ois.close();
+        LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_DECODED table={} 
sessionId={} payloadType={}"
+                        + " decodedBytes={} costMs={}",
+                table == null ? "null" : table.getName(), writeSessionId,
+                payload == null ? "null" : payload.getClass().getName(), 
bytes.length,
+                System.currentTimeMillis() - startMs);
 
         if (payload instanceof WriterCommitMessage) {
             allMessages.add((WriterCommitMessage) payload);
+            LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_AFTER table={} 
sessionId={} added=1 totalMessages={}",
+                    table == null ? "null" : table.getName(), writeSessionId, 
allMessages.size());
             return;
         }
         if (payload instanceof List<?>) {
+            int added = 0;
             for (Object item : (List<?>) payload) {
                 if (!(item instanceof WriterCommitMessage)) {
                     throw new UserException("Unexpected MaxCompute commit 
payload item type: "
                             + (item == null ? "null" : 
item.getClass().getName()));
                 }
                 allMessages.add((WriterCommitMessage) item);
+                added++;
             }
+            LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_AFTER table={} 
sessionId={} added={}"
+                            + " totalMessages={}",
+                    table == null ? "null" : table.getName(), writeSessionId, 
added, allMessages.size());
             return;
         }
         throw new UserException("Unexpected MaxCompute commit payload type: "
@@ -188,33 +252,69 @@ public class MCTransaction implements Transaction {
     public void finishInsert() throws UserException {
         try {
             long t0 = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_FINISH_INSERT_ENTER table={}.{} 
sessionId={} commitDataCount={}"
+                            + " updateRows={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId, commitDataList.size(),
+                    getUpdateCnt());
             // Collect all WriterCommitMessages from BEs
             List<WriterCommitMessage> allMessages = new ArrayList<>();
             synchronized (this) {
+                LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGES_BEFORE 
table={}.{} sessionId={}"
+                                + " commitDataCount={}",
+                        catalog.getDefaultProject(), table.getName(), 
writeSessionId, commitDataList.size());
                 for (TMCCommitData data : commitDataList) {
                     if (data.isSetCommitMessage() && 
!data.getCommitMessage().isEmpty()) {
+                        LOG.info("MC_DIAG 
stage=FE_DESERIALIZE_COMMIT_MESSAGE_ITEM table={}.{} sessionId={}"
+                                        + " partitionSpec={} rowCount={} 
commitMessageLength={}",
+                                catalog.getDefaultProject(), table.getName(), 
writeSessionId,
+                                data.isSetPartitionSpec() ? 
data.getPartitionSpec() : "",
+                                data.getRowCount(), 
data.getCommitMessage().length());
                         appendCommitMessages(allMessages, 
data.getCommitMessage());
                     }
                 }
             }
             long t1 = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGES_AFTER 
table={}.{} sessionId={}"
+                            + " writerCommitMessages={} costMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId, allMessages.size(), t1 - t0);
 
             // Restore session and commit all messages
+            long tableIdStartMs = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_COMMIT_GET_ODPS_TABLE_ID_BEFORE 
table={}.{} sessionId={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId);
             TableIdentifier tableId = 
catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());
+            LOG.info("MC_DIAG stage=FE_COMMIT_GET_ODPS_TABLE_ID_AFTER 
table={}.{} sessionId={} costMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId,
+                    System.currentTimeMillis() - tableIdStartMs);
+            LOG.info("MC_DIAG stage=FE_RESTORE_COMMIT_SESSION_BEFORE 
table={}.{} sessionId={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId);
             TableBatchWriteSession commitSession = new 
TableWriteSessionBuilder()
                     .identifier(tableId)
                     .withSessionId(writeSessionId)
                     .withSettings(catalog.getSettings())
                     .buildBatchWriteSession();
             long t2 = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_RESTORE_COMMIT_SESSION_AFTER 
table={}.{} sessionId={} costMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId, t2 - t1);
 
+            LOG.info("MC_DIAG stage=FE_COMMIT_SESSION_BEFORE table={}.{} 
sessionId={} writerCommitMessages={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId, allMessages.size());
             commitSession.commit(allMessages.toArray(new 
WriterCommitMessage[0]));
             long t3 = System.currentTimeMillis();
+            LOG.info("MC_DIAG stage=FE_COMMIT_SESSION_AFTER table={}.{} 
sessionId={} writerCommitMessages={}"
+                            + " costMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId, allMessages.size(), t3 - t2);
             LOG.info("Committed MC write session {} with {} messages for table 
{}.{}"
                             + " Breakdown: deserialize={}ms, 
restoreSession={}ms, commit={}ms, total={}ms",
                     writeSessionId, allMessages.size(), 
catalog.getDefaultProject(), table.getName(),
                     t1 - t0, t2 - t1, t3 - t2, t3 - t0);
+            LOG.info("MC_DIAG stage=FE_FINISH_INSERT_EXIT table={}.{} 
sessionId={} writerCommitMessages={}"
+                            + " totalCostMs={}",
+                    catalog.getDefaultProject(), table.getName(), 
writeSessionId, allMessages.size(), t3 - t0);
         } catch (Exception e) {
+            LOG.warn("MC_DIAG stage=FE_FINISH_INSERT_ERROR table={}.{} 
sessionId={} error={}: {}",
+                    catalog.getDefaultProject(), table == null ? "null" : 
table.getName(), writeSessionId,
+                    e.getClass().getName(), e.getMessage(), e);
             throw new UserException("Failed to commit MaxCompute write 
session: " + e.getMessage(), e);
         }
     }
@@ -222,11 +322,15 @@ public class MCTransaction implements Transaction {
     @Override
     public void commit() throws UserException {
         // commit is handled in finishInsert()
+        LOG.info("MC_DIAG stage=FE_TRANSACTION_COMMIT_NOOP table={} 
sessionId={}",
+                table == null ? "null" : table.getName(), writeSessionId);
     }
 
     @Override
     public void rollback() {
         // MC sessions auto-expire if not committed; no explicit rollback 
needed
+        LOG.info("MC_DIAG stage=FE_TRANSACTION_ROLLBACK table={} sessionId={} 
commitDataCount={}",
+                table == null ? "null" : table.getName(), writeSessionId, 
commitDataList.size());
         LOG.info("MCTransaction rollback called; uncommitted sessions will 
auto-expire.");
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
index 47df06485e7..7d6cb608d31 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.maxcompute.MCTransaction;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -52,29 +53,69 @@ public class MCInsertExecutor extends 
BaseExternalTableInsertExecutor {
 
     @Override
     protected void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
+        LOG.info("MC_DIAG stage=FE_MC_FINALIZE_SINK_ENTER queryId={} label={} 
table={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName());
         // Let parent call bindDataSink() to build the Thrift sink
         super.finalizeSink(fragment, sink, physicalSink);
         // Save reference so beforeExec() can inject writeSessionId later
         mcTableSink = (MaxComputeTableSink) sink;
+        LOG.info("MC_DIAG stage=FE_MC_FINALIZE_SINK_EXIT queryId={} label={} 
table={} hasSink={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
mcTableSink != null);
     }
 
     @Override
     protected void beforeExec() throws UserException {
+        long startMs = System.currentTimeMillis();
+        LOG.info("MC_DIAG stage=FE_MC_BEFORE_EXEC_ENTER queryId={} label={} 
table={} txnId={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId);
         // 1. Create Storage API write session as part of the transaction
         MCTransaction transaction = (MCTransaction) 
transactionManager.getTransaction(txnId);
+        long beginInsertStartMs = System.currentTimeMillis();
+        LOG.info("MC_DIAG stage=FE_MC_BEGIN_INSERT_BEFORE queryId={} label={} 
table={} txnId={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId);
         transaction.beginInsert((MaxComputeExternalTable) table, insertCtx);
+        LOG.info("MC_DIAG stage=FE_MC_BEGIN_INSERT_AFTER queryId={} label={} 
table={} txnId={} sessionId={}"
+                        + " costMs={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId,
+                transaction.getWriteSessionId(), System.currentTimeMillis() - 
beginInsertStartMs);
 
         // 2. Inject write context into the Thrift sink before fragments are 
sent to BE
         if (mcTableSink != null) {
+            LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_BEFORE queryId={} 
label={} table={} txnId={}"
+                            + " sessionId={}",
+                    DebugUtil.printId(ctx.queryId()), labelName, 
table.getName(), txnId,
+                    transaction.getWriteSessionId());
             mcTableSink.setWriteContext(txnId, 
transaction.getWriteSessionId());
+            LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_AFTER queryId={} 
label={} table={} txnId={}"
+                            + " sessionId={}",
+                    DebugUtil.printId(ctx.queryId()), labelName, 
table.getName(), txnId,
+                    transaction.getWriteSessionId());
+        } else {
+            LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_SKIP queryId={} 
label={} table={} txnId={}",
+                    DebugUtil.printId(ctx.queryId()), labelName, 
table.getName(), txnId);
         }
+        LOG.info("MC_DIAG stage=FE_MC_BEFORE_EXEC_EXIT queryId={} label={} 
table={} txnId={} sessionId={}"
+                        + " costMs={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId,
+                transaction.getWriteSessionId(), System.currentTimeMillis() - 
startMs);
     }
 
     @Override
     protected void doBeforeCommit() throws UserException {
+        long startMs = System.currentTimeMillis();
+        LOG.info("MC_DIAG stage=FE_MC_DO_BEFORE_COMMIT_ENTER queryId={} 
label={} table={} txnId={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId);
         MCTransaction transaction = (MCTransaction) 
transactionManager.getTransaction(txnId);
         loadedRows = transaction.getUpdateCnt();
+        LOG.info("MC_DIAG stage=FE_MC_FINISH_INSERT_BEFORE queryId={} label={} 
table={} txnId={} loadedRows={}"
+                        + " sessionId={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId, loadedRows,
+                transaction.getWriteSessionId());
         transaction.finishInsert();
+        LOG.info("MC_DIAG stage=FE_MC_FINISH_INSERT_AFTER queryId={} label={} 
table={} txnId={} loadedRows={}"
+                        + " sessionId={} costMs={}",
+                DebugUtil.printId(ctx.queryId()), labelName, table.getName(), 
txnId, loadedRows,
+                transaction.getWriteSessionId(), System.currentTimeMillis() - 
startMs);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
index 98537fa0307..93be5a914fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
@@ -28,6 +28,9 @@ import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TMaxComputeTableSink;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +40,8 @@ import java.util.stream.Collectors;
 
 public class MaxComputeTableSink extends BaseExternalTableDataSink {
 
+    private static final Logger LOG = 
LogManager.getLogger(MaxComputeTableSink.class);
+
     private final MaxComputeExternalTable targetTable;
 
     public MaxComputeTableSink(MaxComputeExternalTable targetTable) {
@@ -97,6 +102,11 @@ public class MaxComputeTableSink extends 
BaseExternalTableDataSink {
 
         tDataSink = new TDataSink(TDataSinkType.MAXCOMPUTE_TABLE_SINK);
         tDataSink.setMaxComputeTableSink(tSink);
+        LOG.info("MC_DIAG stage=FE_MC_BIND_SINK table={} project={} 
endpoint={} partitionColumns={}"
+                        + " hasStaticPartition={} connectTimeout={} 
readTimeout={} retryCount={}",
+                targetTable.getName(), catalog.getDefaultProject(), 
catalog.getEndpoint(),
+                partitionColumnNames.size(), tSink.isSetStaticPartitionSpec(), 
catalog.getConnectTimeout(),
+                catalog.getReadTimeout(), catalog.getRetryTimes());
     }
 
     /**
@@ -106,8 +116,15 @@ public class MaxComputeTableSink extends 
BaseExternalTableDataSink {
      */
     public void setWriteContext(long txnId, String writeSessionId) {
         if (tDataSink != null && tDataSink.isSetMaxComputeTableSink()) {
+            LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_BEFORE 
table={} txnId={} sessionId={}",
+                    targetTable.getName(), txnId, writeSessionId);
             tDataSink.getMaxComputeTableSink().setTxnId(txnId);
             
tDataSink.getMaxComputeTableSink().setWriteSessionId(writeSessionId);
+            LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_AFTER 
table={} txnId={} sessionId={}",
+                    targetTable.getName(), txnId, writeSessionId);
+        } else {
+            LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_SKIP table={} 
txnId={} sessionId={}",
+                    targetTable.getName(), txnId, writeSessionId);
         }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dac24f2f798..321a5aab912 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2535,6 +2535,20 @@ public class Coordinator implements CoordInterface {
                 .updateIcebergCommitData(params.getIcebergCommitDatas());
         }
         if (params.isSetMcCommitDatas()) {
+            long mcRows = 0;
+            int mcCommitMessages = 0;
+            long mcCommitMessageBytes = 0;
+            for (org.apache.doris.thrift.TMCCommitData data : 
params.getMcCommitDatas()) {
+                mcRows += data.getRowCount();
+                if (data.isSetCommitMessage() && 
!data.getCommitMessage().isEmpty()) {
+                    mcCommitMessages++;
+                    mcCommitMessageBytes += data.getCommitMessage().length();
+                }
+            }
+            LOG.info("MC_DIAG stage=FE_COORDINATOR_MC_COMMIT_DATA queryId={} 
txnId={} fragmentId={} backendId={}"
+                            + " datas={} rows={} commitMessages={} 
commitMessageBytes={}",
+                    DebugUtil.printId(queryId), txnId, params.getFragmentId(), 
params.getBackendId(),
+                    params.getMcCommitDatasSize(), mcRows, mcCommitMessages, 
mcCommitMessageBytes);
             ((MCTransaction) 
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
                 .updateMCCommitData(params.getMcCommitDatas());
         }
@@ -3554,4 +3568,3 @@ public class Coordinator implements CoordInterface {
         this.queryOptions.setEnableProfile(isSafe && 
queryOptions.isEnableProfile());
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
index 45995a7aad7..48c08700494 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
@@ -231,6 +231,21 @@ public class LoadProcessor extends AbstractJobProcessor {
                     .updateIcebergCommitData(params.getIcebergCommitDatas());
         }
         if (params.isSetMcCommitDatas()) {
+            long mcRows = 0;
+            int mcCommitMessages = 0;
+            long mcCommitMessageBytes = 0;
+            for (org.apache.doris.thrift.TMCCommitData data : 
params.getMcCommitDatas()) {
+                mcRows += data.getRowCount();
+                if (data.isSetCommitMessage() && 
!data.getCommitMessage().isEmpty()) {
+                    mcCommitMessages++;
+                    mcCommitMessageBytes += data.getCommitMessage().length();
+                }
+            }
+            LOG.info("MC_DIAG stage=FE_LOAD_PROCESSOR_MC_COMMIT_DATA 
queryId={} txnId={} fragmentId={}"
+                            + " backendId={} datas={} rows={} 
commitMessages={} commitMessageBytes={}",
+                    DebugUtil.printId(coordinatorContext.queryId), txnId, 
params.getFragmentId(),
+                    params.getBackendId(), params.getMcCommitDatasSize(), 
mcRows, mcCommitMessages,
+                    mcCommitMessageBytes);
             ((MCTransaction) 
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
                     .updateMCCommitData(params.getMcCommitDatas());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 99dd56cbe42..602d89f29bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2915,8 +2915,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     @Override
     public TMaxComputeBlockIdResult 
getMaxComputeBlockIdRange(TMaxComputeBlockIdRequest request) {
+        long startMs = System.currentTimeMillis();
         String clientAddr = getClientAddrAsString();
         LOG.info("receive getMaxComputeBlockIdRange request: {}, backend: {}", 
request, clientAddr);
+        LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_ENTER txnId={} sessionId={} 
length={} backend={}",
+                request.getTxnId(), request.getWriteSessionId(), 
request.getLength(), clientAddr);
 
         TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
         TStatus status = checkMaster();
@@ -2924,10 +2927,16 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         if (status.getStatusCode() != TStatusCode.OK) {
             result.setMasterAddress(getMasterAddress());
+            LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_NOT_MASTER txnId={} 
sessionId={} backend={} status={}"
+                            + " master={} costMs={}",
+                    request.getTxnId(), request.getWriteSessionId(), 
clientAddr, status.getStatusCode(),
+                    result.getMasterAddress(), System.currentTimeMillis() - 
startMs);
             return result;
         }
 
         try {
+            LOG.info("MC_DIAG stage=FE_BLOCK_ID_GET_TXN_BEFORE txnId={} 
sessionId={} backend={}",
+                    request.getTxnId(), request.getWriteSessionId(), 
clientAddr);
             Transaction transaction = 
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr()
                     .getTxnById(request.getTxnId());
             if (!(transaction instanceof MCTransaction)) {
@@ -2935,25 +2944,42 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         + " is not a MaxCompute transaction");
             }
 
+            LOG.info("MC_DIAG stage=FE_BLOCK_ID_ALLOCATE_BEFORE txnId={} 
sessionId={} length={} backend={}",
+                    request.getTxnId(), request.getWriteSessionId(), 
request.getLength(), clientAddr);
             long start = ((MCTransaction) transaction).allocateBlockIdRange(
                     request.getWriteSessionId(), request.getLength());
             result.setStart(start);
             result.setLength(request.getLength());
+            LOG.info("MC_DIAG stage=FE_BLOCK_ID_ALLOCATE_AFTER txnId={} 
sessionId={} start={} length={}"
+                            + " backend={} costMs={}",
+                    request.getTxnId(), request.getWriteSessionId(), start, 
request.getLength(), clientAddr,
+                    System.currentTimeMillis() - startMs);
         } catch (UserException e) {
+            LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_USER_ERROR txnId={} 
sessionId={} backend={} error={}",
+                    request.getTxnId(), request.getWriteSessionId(), 
clientAddr, e.getMessage(), e);
             LOG.warn("failed to allocate MaxCompute block_id, txnId={}, 
sessionId={}, errmsg={}",
                     request.getTxnId(), request.getWriteSessionId(), 
e.getMessage());
             status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
             status.addToErrorMsgs(e.getMessage());
         } catch (RuntimeException e) {
+            LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_RUNTIME_ERROR txnId={} 
sessionId={} backend={} error={}",
+                    request.getTxnId(), request.getWriteSessionId(), 
clientAddr, e.getMessage(), e);
             LOG.warn("failed to allocate MaxCompute block_id, txnId={}, 
sessionId={}, errmsg={}",
                     request.getTxnId(), request.getWriteSessionId(), 
e.getMessage(), e);
             status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
             status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
         } catch (Throwable e) {
+            LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_UNKNOWN_ERROR txnId={} 
sessionId={} backend={} error={}",
+                    request.getTxnId(), request.getWriteSessionId(), 
clientAddr, e.getMessage(), e);
             LOG.warn("catch unknown result when allocating MaxCompute 
block_id.", e);
             status.setStatusCode(TStatusCode.INTERNAL_ERROR);
             status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + 
Strings.nullToEmpty(e.getMessage()));
         }
+        LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_EXIT txnId={} sessionId={} 
status={} startSet={} start={} lengthSet={}"
+                        + " length={} backend={} costMs={}",
+                request.getTxnId(), request.getWriteSessionId(), 
status.getStatusCode(), result.isSetStart(),
+                result.isSetStart() ? result.getStart() : -1, 
result.isSetLength(),
+                result.isSetLength() ? result.getLength() : -1, clientAddr, 
System.currentTimeMillis() - startMs);
         return result;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to