FayneBupt commented on code in PR #62689: URL: https://github.com/apache/doris/pull/62689#discussion_r3202194760
########## be/src/vec/sink/vpaimon_table_writer.cpp: ########## @@ -0,0 +1,740 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vpaimon_table_writer.h" + +#include <gen_cpp/DataSinks_types.h> + +#include <algorithm> +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <string_view> +#include <utility> +#include <vector> + +#include "common/metrics/doris_metrics.h" +#include "core/block/block.h" +#include "core/column/column.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "runtime/query_context.h" +#include "runtime/runtime_profile.h" +#include "runtime/runtime_state.h" +#include "util/defer_op.h" +#include "vec/sink/paimon_writer_utils.h" +#include "vec/sink/writer/paimon/paimon_doris_hdfs_file_system.h" +#include "vec/sink/writer/paimon/vpaimon_partition_writer.h" + +#ifdef WITH_PAIMON_CPP +#include <arrow/array.h> +#include <arrow/c/bridge.h> +#include <arrow/record_batch.h> +#include <arrow/type.h> + +#include <cstdint> + +#include "format/arrow/arrow_block_convertor.h" +#include "format/arrow/arrow_row_batch.h" +#include "format/parquet/arrow_memory_pool.h" +#include "io/fs/hdfs_file_system.h" +#include "io/hdfs_builder.h" +#include "paimon/commit_message.h" +#include "paimon/factories/factory_creator.h" +#include "paimon/file_store_write.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/utils/bucket_id_calculator.h" +#include "paimon/write_context.h" +#include "vec/sink/writer/paimon/paimon_doris_memory_pool.h" + +// Force link paimon file format factories +namespace paimon { +namespace parquet {} +} // namespace paimon + +#endif + +namespace doris { +namespace vectorized { + +#ifdef WITH_PAIMON_CPP +namespace { +bool is_paimon_cpp_time_metric(std::string_view name) { + return name.size() > 3 && name.substr(name.size() - 3) == "_ns"; +} + +void attach_paimon_cpp_metrics_to_profile(RuntimeProfile* profile, + const std::shared_ptr<::paimon::Metrics>& metrics) { + if (profile == nullptr || !metrics) { + return; + } + auto all = metrics->GetAllCounters(); + if (all.empty()) { + return; + } + for (const auto& kv : all) { + std::string counter_name = "PaimonCpp_" + kv.first; + std::replace(counter_name.begin(), counter_name.end(), '.', '_'); + RuntimeProfile::Counter* counter = nullptr; + if (is_paimon_cpp_time_metric(kv.first)) { + counter = ADD_COUNTER(profile, counter_name, TUnit::TIME_NS); + } else { + counter = ADD_COUNTER(profile, counter_name, TUnit::UNIT); + } + COUNTER_UPDATE(counter, kv.second); + } +} +} // namespace +#endif + +VPaimonTableWriter::~VPaimonTableWriter() = default; + +VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_exprs) + : VPaimonTableWriter(t_sink, output_exprs, nullptr, nullptr) {} + +VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_exprs, + std::shared_ptr<Dependency> dep, + std::shared_ptr<Dependency> fin_dep) + : AsyncResultWriter(output_exprs, std::move(dep), std::move(fin_dep)), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.paimon_table_sink); +} + +Status VPaimonTableWriter::init_properties(ObjectPool* /*pool*/) { + // Currently there is no extra property to initialize. Kept for symmetry + // with VIcebergTableWriter and future paimon-cpp wiring. + return Status::OK(); +} + +Status VPaimonTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; +#ifndef WITH_PAIMON_CPP + return Status::NotSupported("paimon-cpp is not enabled"); +#else + _written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT); + _written_bytes_counter = ADD_COUNTER(_profile, "WrittenBytes", TUnit::BYTES); + _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _project_timer = ADD_CHILD_TIMER(_profile, "ProjectTime", "SendDataTime"); + _bucket_calc_timer = ADD_CHILD_TIMER(_profile, "BucketCalcTime", "SendDataTime"); + _partition_writers_dispatch_timer = + ADD_CHILD_TIMER(_profile, "PartitionsDispatchTime", "SendDataTime"); + _partition_writers_write_timer = + ADD_CHILD_TIMER(_profile, "PartitionsWriteTime", "SendDataTime"); + _partition_writers_count = ADD_COUNTER(_profile, "PartitionsWriteCount", TUnit::UNIT); + _partition_writer_created = ADD_COUNTER(_profile, "PartitionWriterCreated", TUnit::UNIT); + _open_timer = ADD_TIMER(_profile, "OpenTime"); + _close_timer = ADD_TIMER(_profile, "CloseTime"); + _prepare_commit_timer = ADD_TIMER(_profile, "PrepareCommitTime"); + _serialize_commit_messages_timer = ADD_TIMER(_profile, "SerializeCommitMessagesTime"); + _commit_payload_bytes_counter = ADD_COUNTER(_profile, "CommitPayloadBytes", TUnit::BYTES); + + SCOPED_TIMER(_open_timer); + + ensure_paimon_doris_hdfs_file_system_registered(); + + auto registered_types = paimon::FactoryCreator::GetInstance()->GetRegisteredType(); + std::string types_str; + bool has_parquet = false; + for (const auto& t : registered_types) { + types_str += t + ", "; + has_parquet |= (t == "parquet"); + } + if (!has_parquet) { + return Status::InternalError( + "paimon-cpp parquet file format factory is not registered (missing 'parquet' in " + "FactoryCreator). Please ensure BE is built with WITH_PAIMON_CPP=ON and linked " + "with libpaimon_parquet_file_format.a (whole-archive). Registered factories: {}", + types_str); + } + + _pool = std::make_shared<PaimonDorisMemoryPool>(_state->query_mem_tracker()); + const auto& paimon_sink = _t_sink.paimon_table_sink; + if (!paimon_sink.__isset.table_location || paimon_sink.table_location.empty()) { + return Status::InvalidArgument("paimon table location is empty"); + } + std::string commit_user; + if (paimon_sink.__isset.options) { + auto it = paimon_sink.options.find("doris.commit_user"); + if (it != paimon_sink.options.end()) { + commit_user = it->second; + } + } + if (commit_user.empty()) { + commit_user = _state->user(); + } + + std::map<std::string, std::string> options; + if (paimon_sink.__isset.options) { + for (const auto& kv : paimon_sink.options) { + if (kv.first.rfind("doris.", 0) == 0) { + continue; + } + options.emplace(kv.first, kv.second); + } + } + + // Workaround for paimon-cpp issue where it defaults to LocalFileSystem if path has no scheme. + // If table_location is missing scheme (common in HDFS setup without full URI), + // and fs.defaultFS is provided in options, we prepend it. + std::string table_location = paimon_sink.table_location; + if (table_location.find("://") == std::string::npos) { + auto it = options.find("fs.defaultFS"); + if (it != options.end()) { + std::string default_fs = it->second; + // Remove trailing slash from default_fs if present + while (!default_fs.empty() && default_fs.back() == '/') { + default_fs.pop_back(); + } + // Remove leading slash from table_location if present + if (!table_location.empty() && table_location.front() == '/') { + table_location = default_fs + table_location; + } else { + table_location = default_fs + "/" + table_location; + } + } + } + + int64_t buffer_size = 256 * 1024 * 1024L; // Default 256MB + + if (_state->query_options().__isset.paimon_write_buffer_size && + _state->query_options().paimon_write_buffer_size > 0) { + buffer_size = _state->query_options().paimon_write_buffer_size; + } + + bool enable_adaptive = true; + if (_state->query_options().__isset.enable_paimon_adaptive_buffer_size) { + enable_adaptive = _state->query_options().enable_paimon_adaptive_buffer_size; + } + + if (enable_adaptive && paimon_sink.__isset.bucket_num && paimon_sink.bucket_num > 0) { + int bucket_num = paimon_sink.bucket_num; + buffer_size = get_paimon_write_buffer_size(buffer_size, true, bucket_num); + LOG(INFO) << "Adaptive Paimon Buffer Size: bucket_num=" << bucket_num + << ", adjusted_buffer_size=" << buffer_size; + } + LOG(INFO) << "Paimon Native Writer Final Buffer Size: " << buffer_size + << " (enable_adaptive=" << enable_adaptive << ")"; + options["write-buffer-size"] = std::to_string(buffer_size); + + if (_state->query_options().__isset.paimon_target_file_size && + _state->query_options().paimon_target_file_size > 0) { + options["target-file-size"] = + std::to_string(_state->query_options().paimon_target_file_size); + LOG(INFO) << "Paimon Native Writer Target File Size: " + << _state->query_options().paimon_target_file_size; + } + + options["file.format"] = "parquet"; Review Comment: Fixed by rejecting ORC on the native paimon-cpp path. FE now forwards the table file.format/manifest.format options and raises an AnalysisException when enable_paimon_jni_writer=false targets an ORC table; BE also keeps a native-writer fallback check. ORC inserts should use the JNI writer for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
