github-actions[bot] commented on code in PR #61820:
URL: https://github.com/apache/doris/pull/61820#discussion_r3018928211
##########
gensrc/thrift/Types.thrift:
##########
@@ -102,7 +102,8 @@ enum TPrimitiveType {
UINT64 = 41, // only used in BE to represent offsets
FIXED_LENGTH_OBJECT = 42 // only used in BE to represent fixed-length object
VARBINARY = 43, // represent varbinary type
- TIMESTAMPTZ = 44 // timestamp with time zone
+ TIMESTAMPTZ = 44, // timestamp with time zone
+ FILE = 45
Review Comment:
This adds a new FE/BE protocol scalar type without any mixed-version guard.
During a rolling upgrade, a new FE can plan `to_file()` as
`TPrimitiveType.FILE`, but an older BE will not understand that enum value and
will fall into its unknown-type path when decoding the plan. Doris review rules
require protocol changes to remain safe during rolling upgrades.
Please gate planning on `be_exec_version`, or lower the wire representation
to the existing physical `STRUCT` shape until all BEs are upgraded.
##########
be/src/exprs/function/function_file.cpp:
##########
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cctype>
+#include <charconv>
+#include <curl/curl.h>
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "common/status.h"
+#include "core/binary_cast.hpp"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/block/column_numbers.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_file.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_file.h"
+#include "core/data_type/file_schema_descriptor.h"
+#include "core/data_type/primitive_type.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/function/function.h"
+#include "exprs/function/simple_function_factory.h"
+#include "service/http/http_client.h"
+#include "service/http/http_headers.h"
+
+namespace doris {
+
+namespace {
+
+struct FileViewdata {
+ int64_t size;
+ std::optional<std::string> etag;
+ uint64_t last_modified_at;
+};
+
+std::string strip_url_suffix(std::string_view uri) {
+ size_t end = uri.find_first_of("?#");
+ if (end == std::string_view::npos) {
+ return std::string(uri);
+ }
+ return std::string(uri.substr(0, end));
+}
+
+std::string extract_file_name(std::string_view uri) {
+ std::string normalized = strip_url_suffix(uri);
+ size_t pos = normalized.find_last_of('/');
+ if (pos == std::string::npos) {
+ return normalized;
+ }
+ if (pos + 1 >= normalized.size()) {
+ return "";
+ }
+ return normalized.substr(pos + 1);
+}
+
+std::string extract_file_extension(const std::string& file_name) {
+ size_t pos = file_name.find_last_of('.');
+ if (pos == std::string::npos) {
+ return "";
+ }
+ std::string extension = file_name.substr(pos);
+ std::transform(extension.begin(), extension.end(), extension.begin(),
+ [](unsigned char ch) { return
static_cast<char>(std::tolower(ch)); });
+ return extension;
+}
+
+std::string extract_url_scheme(std::string_view uri) {
+ size_t pos = uri.find("://");
+ if (pos == std::string_view::npos) {
+ return "";
+ }
+ std::string scheme(uri.substr(0, pos));
+ std::transform(scheme.begin(), scheme.end(), scheme.begin(),
+ [](unsigned char ch) { return
static_cast<char>(std::tolower(ch)); });
+ return scheme;
+}
+
+Result<FileViewdata> fetch_file_metadata(std::string_view url) {
+ const std::string scheme = extract_url_scheme(url);
+ if (scheme != "http" && scheme != "https") {
+ return ResultError(Status::InvalidArgument(
+ "to_file(url) only supports HTTP(S) object URLs with embedded
auth info, got: {}",
+ url));
+ }
+
+ auto read_common_headers = [&](HttpClient& client,
+ std::optional<uint64_t> fallback_length) ->
Result<FileViewdata> {
+ uint64_t content_length = fallback_length.value_or(0);
+ if (!fallback_length.has_value()) {
+ RETURN_IF_ERROR_RESULT(client.get_content_length(&content_length));
+ }
+
+ std::string etag;
+ RETURN_IF_ERROR_RESULT(client.get_header(HttpHeaders::ETAG, &etag));
+
+ std::string last_modified;
+ RETURN_IF_ERROR_RESULT(client.get_header(HttpHeaders::LAST_MODIFIED,
&last_modified));
+ if (last_modified.empty()) {
+ return ResultError(Status::InvalidArgument(
+ "to_file(url) requires Last-Modified header from object
storage, url={}",
+ url));
+ }
+
+ time_t ts = curl_getdate(last_modified.c_str(), nullptr);
+ if (ts < 0) {
+ return ResultError(Status::InvalidArgument(
+ "failed to parse Last-Modified header '{}' for url={}",
last_modified, url));
+ }
+
+ DateV2Value<DateTimeV2ValueType> dt;
+ dt.from_unixtime(ts, cctz::utc_time_zone());
+ return FileViewdata {
+ .size = static_cast<int64_t>(content_length),
+ .etag = etag.empty() ? std::nullopt :
std::optional<std::string>(std::move(etag)),
+ .last_modified_at =
binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(dt),
+ };
+ };
+
+ auto parse_total_size_from_content_range = [&](std::string_view
content_range)
+ -> Result<uint64_t> {
+ const size_t slash_pos = content_range.rfind('/');
+ if (slash_pos == std::string_view::npos || slash_pos + 1 >=
content_range.size()) {
+ return ResultError(Status::InvalidArgument(
+ "invalid Content-Range header '{}' for url={}",
content_range, url));
+ }
+ std::string_view total_size_str = content_range.substr(slash_pos + 1);
+ uint64_t total_size = 0;
+ const auto [ptr, ec] = std::from_chars(total_size_str.data(),
+ total_size_str.data() +
total_size_str.size(),
+ total_size);
+ if (ec != std::errc() || ptr != total_size_str.data() +
total_size_str.size()) {
+ return ResultError(Status::InvalidArgument(
+ "invalid Content-Range total size '{}' for url={}",
total_size_str, url));
+ }
+ return total_size;
+ };
+
+ HttpClient client;
+ RETURN_IF_ERROR_RESULT(client.init(std::string(url), false));
+ client.set_method(HEAD);
+ client.set_unrestricted_auth(1);
+ RETURN_IF_ERROR_RESULT(client.execute());
+
+ const long head_status = client.get_http_status();
+ if (head_status >= 200 && head_status < 300) {
+ return read_common_headers(client, std::nullopt);
+ }
+
+ RETURN_IF_ERROR_RESULT(client.init(std::string(url), false));
+ client.set_method(GET);
+ client.set_unrestricted_auth(1);
+ client.set_range(0, 1);
+ RETURN_IF_ERROR_RESULT(client.execute([](const void*, size_t) { return
true; }));
+
+ const long get_status = client.get_http_status();
+ if (get_status < 200 || get_status >= 300) {
+ return ResultError(Status::HttpError(
+ "failed to fetch file metadata, HEAD status={}, GET status={},
url={}", head_status,
+ get_status, url));
+ }
+
+ std::optional<uint64_t> total_size;
+ std::string content_range;
+ RETURN_IF_ERROR_RESULT(client.get_header(HttpHeaders::CONTENT_RANGE,
&content_range));
+ if (!content_range.empty()) {
+ auto total_size_result =
parse_total_size_from_content_range(content_range);
+ if (!total_size_result.has_value()) {
+ return ResultError(total_size_result.error());
+ }
+ total_size = total_size_result.value();
+ }
+
+ if (!total_size.has_value()) {
+ uint64_t content_length = 0;
+ RETURN_IF_ERROR_RESULT(client.get_content_length(&content_length));
+ total_size = content_length;
+ }
+
+ return read_common_headers(client, total_size);
+}
+
+} // namespace
+
+class FunctionToFile : public IFunction {
+public:
+ static constexpr auto name = "to_file";
+
+ static FunctionPtr create() { return std::make_shared<FunctionToFile>(); }
+
+ String get_name() const override { return name; }
+
+ size_t get_number_of_arguments() const override { return 1; }
+
+ DataTypePtr get_return_type_impl(const DataTypes& arguments) const
override {
+ return std::make_shared<DataTypeFile>();
+ }
+
+ Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
+ uint32_t result, size_t input_rows_count) const
override {
+ DCHECK_EQ(arguments.size(), 1);
+
+ ColumnPtr uri_col_ptr =
+
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+ const ColumnString* uri_col = nullptr;
+ if (const auto* nullable =
check_and_get_column<ColumnNullable>(uri_col_ptr.get())) {
Review Comment:
`execute_impl()` still is not safe for nullable inputs. Doris default null
handling unwraps `Nullable(String)` arguments to their nested `ColumnString`
before calling the function body, and rows whose SQL value is `NULL` keep
arbitrary nested payloads. This loop will still call `fetch_file_metadata()`
for those rows, so `select to_file(nullable_url)` can fail on a null row with
an unsupported-scheme / empty-url error instead of returning `NULL`.
Please override `use_default_implementation_for_nulls()` and honor the null
map explicitly, or skip null rows before doing the remote fetch.
##########
be/src/exprs/function/function_file.cpp:
##########
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cctype>
+#include <charconv>
+#include <curl/curl.h>
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "common/status.h"
+#include "core/binary_cast.hpp"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/block/column_numbers.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_file.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_file.h"
+#include "core/data_type/file_schema_descriptor.h"
+#include "core/data_type/primitive_type.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/function/function.h"
+#include "exprs/function/simple_function_factory.h"
+#include "service/http/http_client.h"
+#include "service/http/http_headers.h"
+
+namespace doris {
+
+namespace {
+
+struct FileViewdata {
+ int64_t size;
+ std::optional<std::string> etag;
+ uint64_t last_modified_at;
+};
+
+std::string strip_url_suffix(std::string_view uri) {
+ size_t end = uri.find_first_of("?#");
+ if (end == std::string_view::npos) {
+ return std::string(uri);
+ }
+ return std::string(uri.substr(0, end));
+}
+
+std::string extract_file_name(std::string_view uri) {
+ std::string normalized = strip_url_suffix(uri);
+ size_t pos = normalized.find_last_of('/');
+ if (pos == std::string::npos) {
+ return normalized;
+ }
+ if (pos + 1 >= normalized.size()) {
+ return "";
+ }
+ return normalized.substr(pos + 1);
+}
+
+std::string extract_file_extension(const std::string& file_name) {
+ size_t pos = file_name.find_last_of('.');
+ if (pos == std::string::npos) {
+ return "";
+ }
+ std::string extension = file_name.substr(pos);
+ std::transform(extension.begin(), extension.end(), extension.begin(),
+ [](unsigned char ch) { return
static_cast<char>(std::tolower(ch)); });
+ return extension;
+}
+
+std::string extract_url_scheme(std::string_view uri) {
+ size_t pos = uri.find("://");
+ if (pos == std::string_view::npos) {
+ return "";
+ }
+ std::string scheme(uri.substr(0, pos));
+ std::transform(scheme.begin(), scheme.end(), scheme.begin(),
+ [](unsigned char ch) { return
static_cast<char>(std::tolower(ch)); });
+ return scheme;
+}
+
+Result<FileViewdata> fetch_file_metadata(std::string_view url) {
+ const std::string scheme = extract_url_scheme(url);
+ if (scheme != "http" && scheme != "https") {
+ return ResultError(Status::InvalidArgument(
+ "to_file(url) only supports HTTP(S) object URLs with embedded
auth info, got: {}",
+ url));
+ }
+
+ auto read_common_headers = [&](HttpClient& client,
+ std::optional<uint64_t> fallback_length) ->
Result<FileViewdata> {
+ uint64_t content_length = fallback_length.value_or(0);
+ if (!fallback_length.has_value()) {
+ RETURN_IF_ERROR_RESULT(client.get_content_length(&content_length));
+ }
+
+ std::string etag;
+ RETURN_IF_ERROR_RESULT(client.get_header(HttpHeaders::ETAG, &etag));
+
+ std::string last_modified;
+ RETURN_IF_ERROR_RESULT(client.get_header(HttpHeaders::LAST_MODIFIED,
&last_modified));
+ if (last_modified.empty()) {
+ return ResultError(Status::InvalidArgument(
+ "to_file(url) requires Last-Modified header from object
storage, url={}",
+ url));
+ }
+
+ time_t ts = curl_getdate(last_modified.c_str(), nullptr);
+ if (ts < 0) {
+ return ResultError(Status::InvalidArgument(
+ "failed to parse Last-Modified header '{}' for url={}",
last_modified, url));
+ }
+
+ DateV2Value<DateTimeV2ValueType> dt;
+ dt.from_unixtime(ts, cctz::utc_time_zone());
+ return FileViewdata {
+ .size = static_cast<int64_t>(content_length),
+ .etag = etag.empty() ? std::nullopt :
std::optional<std::string>(std::move(etag)),
+ .last_modified_at =
binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(dt),
+ };
+ };
+
+ auto parse_total_size_from_content_range = [&](std::string_view
content_range)
+ -> Result<uint64_t> {
+ const size_t slash_pos = content_range.rfind('/');
+ if (slash_pos == std::string_view::npos || slash_pos + 1 >=
content_range.size()) {
+ return ResultError(Status::InvalidArgument(
+ "invalid Content-Range header '{}' for url={}",
content_range, url));
+ }
+ std::string_view total_size_str = content_range.substr(slash_pos + 1);
+ uint64_t total_size = 0;
+ const auto [ptr, ec] = std::from_chars(total_size_str.data(),
+ total_size_str.data() +
total_size_str.size(),
+ total_size);
+ if (ec != std::errc() || ptr != total_size_str.data() +
total_size_str.size()) {
+ return ResultError(Status::InvalidArgument(
+ "invalid Content-Range total size '{}' for url={}",
total_size_str, url));
+ }
+ return total_size;
+ };
+
+ HttpClient client;
+ RETURN_IF_ERROR_RESULT(client.init(std::string(url), false));
+ client.set_method(HEAD);
+ client.set_unrestricted_auth(1);
+ RETURN_IF_ERROR_RESULT(client.execute());
+
+ const long head_status = client.get_http_status();
+ if (head_status >= 200 && head_status < 300) {
+ return read_common_headers(client, std::nullopt);
+ }
+
+ RETURN_IF_ERROR_RESULT(client.init(std::string(url), false));
+ client.set_method(GET);
+ client.set_unrestricted_auth(1);
+ client.set_range(0, 1);
+ RETURN_IF_ERROR_RESULT(client.execute([](const void*, size_t) { return
true; }));
+
+ const long get_status = client.get_http_status();
+ if (get_status < 200 || get_status >= 300) {
+ return ResultError(Status::HttpError(
+ "failed to fetch file metadata, HEAD status={}, GET status={},
url={}", head_status,
+ get_status, url));
+ }
+
+ std::optional<uint64_t> total_size;
+ std::string content_range;
+ RETURN_IF_ERROR_RESULT(client.get_header(HttpHeaders::CONTENT_RANGE,
&content_range));
+ if (!content_range.empty()) {
+ auto total_size_result =
parse_total_size_from_content_range(content_range);
+ if (!total_size_result.has_value()) {
+ return ResultError(total_size_result.error());
+ }
+ total_size = total_size_result.value();
+ }
+
+ if (!total_size.has_value()) {
+ uint64_t content_length = 0;
+ RETURN_IF_ERROR_RESULT(client.get_content_length(&content_length));
+ total_size = content_length;
+ }
+
+ return read_common_headers(client, total_size);
+}
+
+} // namespace
+
+class FunctionToFile : public IFunction {
+public:
+ static constexpr auto name = "to_file";
+
+ static FunctionPtr create() { return std::make_shared<FunctionToFile>(); }
+
+ String get_name() const override { return name; }
+
+ size_t get_number_of_arguments() const override { return 1; }
+
+ DataTypePtr get_return_type_impl(const DataTypes& arguments) const
override {
+ return std::make_shared<DataTypeFile>();
+ }
+
+ Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
+ uint32_t result, size_t input_rows_count) const
override {
+ DCHECK_EQ(arguments.size(), 1);
+
+ ColumnPtr uri_col_ptr =
+
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+ const ColumnString* uri_col = nullptr;
+ if (const auto* nullable =
check_and_get_column<ColumnNullable>(uri_col_ptr.get())) {
+ uri_col = &assert_cast<const
ColumnString&>(nullable->get_nested_column());
+ } else {
+ uri_col = &assert_cast<const ColumnString&>(*uri_col_ptr);
+ }
+
+ const auto& schema = FileSchemaDescriptor::instance();
+ auto result_col = ColumnFile::create(schema);
+ auto& struct_col = result_col->get_struct_column();
+ struct_col.reserve(input_rows_count);
+
+ auto& object_uri_col =
assert_cast<ColumnString&>(struct_col.get_column(0));
+ auto& file_name_col =
assert_cast<ColumnString&>(struct_col.get_column(1));
+ auto& file_ext_col =
assert_cast<ColumnString&>(struct_col.get_column(2));
+ auto& size_col = assert_cast<ColumnInt64&>(struct_col.get_column(3));
+ auto& etag_col =
assert_cast<ColumnNullable&>(struct_col.get_column(4));
+ auto& mtime_col =
assert_cast<ColumnDateTimeV2&>(struct_col.get_column(5));
+
+ for (size_t row = 0; row < input_rows_count; ++row) {
+ StringRef uri_ref = uri_col->get_data_at(row);
+ std::string uri = uri_ref.to_string();
+ std::string file_name = extract_file_name(uri);
+ std::string file_ext = extract_file_extension(file_name);
+ FileViewdata metadata = DORIS_TRY(fetch_file_metadata(uri));
Review Comment:
`fetch_file_metadata()` performs synchronous libcurl I/O here, but
`FunctionToFile` still inherits `is_blockable() == false`. The pipeline
scheduler relies on expression trees reporting blocking work via
`VExpr::contains_blockable_function()` /
`PipelineXLocalStateBase::is_blockable()`. As written, a projection that uses
`to_file()` can keep worker threads occupied while waiting on remote object
storage for every row. Existing network-backed scalar functions such as
`AIFunction` override `is_blockable()` for this reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]