This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 3d23f8b2b5 [Opt](functions) Use preloaded cache to accelerate timezone 
parsing (#22694) (#23510)
3d23f8b2b5 is described below

commit 3d23f8b2b51503ba6f0e03977932751054709fcc
Author: zclllyybb <[email protected]>
AuthorDate: Fri Aug 25 21:27:17 2023 +0800

    [Opt](functions) Use preloaded cache to accelerate timezone parsing 
(#22694) (#23510)
---
 be/src/http/action/stream_load.cpp                 |   3 -
 be/src/http/http_common.h                          |   1 -
 be/src/runtime/exec_env.h                          |   8 +
 be/src/runtime/exec_env_init.cpp                   |   4 +
 be/src/util/timezone_utils.cpp                     | 177 ++++++++++++++++++++-
 be/src/util/timezone_utils.h                       |   6 +-
 be/src/vec/functions/function_cast.h               |  42 +++--
 be/src/vec/functions/function_convert_tz.h         |  55 ++++---
 be/src/vec/io/io_helper.h                          |  25 +--
 be/src/vec/runtime/vdatetime_value.cpp             |  48 +++---
 be/src/vec/runtime/vdatetime_value.h               |  12 +-
 be/test/vec/function/function_time_test.cpp        |  32 ----
 .../datetimev2/test_tz_streamload.groovy           |   2 +-
 13 files changed, 294 insertions(+), 121 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index b31f4fe331..837a84f5a5 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -472,9 +472,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
     if (!http_req->header(HTTP_TIMEZONE).empty()) {
         request.__set_timezone(http_req->header(HTTP_TIMEZONE));
     }
-    if (!http_req->header(HTTP_TIME_ZONE).empty()) {
-        request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
-    }
     if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
         try {
             
request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT)));
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index afcbc3ff0d..616ae872f0 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -38,7 +38,6 @@ static const std::string HTTP_TEMP_PARTITIONS = 
"temporary_partitions";
 static const std::string HTTP_NEGATIVE = "negative";
 static const std::string HTTP_STRICT_MODE = "strict_mode";
 static const std::string HTTP_TIMEZONE = "timezone";
-static const std::string HTTP_TIME_ZONE = "time_zone";
 static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
 static const std::string HTTP_JSONPATHS = "jsonpaths";
 static const std::string HTTP_JSONROOT = "json_root";
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 40c489efb5..93db9b4f48 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <map>
 #include <memory>
+#include <shared_mutex>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -29,11 +30,13 @@
 #include "common/status.h"
 #include "olap/options.h"
 #include "util/threadpool.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
 
 namespace doris {
 namespace vectorized {
 class VDataStreamMgr;
 class ScannerScheduler;
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
 } // namespace vectorized
 namespace pipeline {
 class TaskScheduler;
@@ -172,6 +175,8 @@ public:
     HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
     doris::vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
     FileMetaCache* file_meta_cache() { return _file_meta_cache; }
+    vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
+    std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
 
     // only for unit test
     void set_master_info(TMasterInfo* master_info) { this->_master_info = 
master_info; }
@@ -255,6 +260,9 @@ private:
     BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
+
+    std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
+    std::shared_mutex _zone_cache_rw_lock;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 6b7e337ecd..5fca5e325e 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -77,6 +77,7 @@
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
+#include "util/timezone_utils.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
@@ -116,6 +117,9 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
 
     TimezoneUtils::load_timezone_names();
 
+    _global_zone_cache = std::make_unique<vectorized::ZoneList>();
+    TimezoneUtils::load_timezones_to_cache(*_global_zone_cache);
+
     ThreadPoolBuilder("SendBatchThreadPool")
             .set_min_threads(config::send_batch_thread_pool_thread_num)
             .set_max_threads(config::send_batch_thread_pool_thread_num)
diff --git a/be/src/util/timezone_utils.cpp b/be/src/util/timezone_utils.cpp
index 364bf53bbc..112dc74eb0 100644
--- a/be/src/util/timezone_utils.cpp
+++ b/be/src/util/timezone_utils.cpp
@@ -14,13 +14,18 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 #include "util/timezone_utils.h"
 
 #include <cctz/civil_time.h>
 #include <cctz/time_zone.h>
+#include <fcntl.h>
+#include <glog/logging.h>
 #include <re2/stringpiece.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 #include <boost/algorithm/string.hpp>
 #include <cctype>
@@ -68,6 +73,175 @@ void TimezoneUtils::load_timezone_names() {
         }
     }
 }
+
+namespace { // functions use only in this file
+
+template <typename T>
+T swapEndianness(T value) {
+    constexpr int numBytes = sizeof(T);
+    T result = 0;
+    for (int i = 0; i < numBytes; ++i) {
+        result = (result << 8) | ((value >> (8 * i)) & 0xFF);
+    }
+    return result;
+}
+
+template <typename T>
+T next_from_charstream(int8_t*& src) {
+    T value = *reinterpret_cast<T*>(src);
+    src += sizeof(T) / sizeof(int8_t);
+    if constexpr (std::endian::native == std::endian::little) {
+        return swapEndianness(
+                value); // timezone information files use network endianess, 
which is big-endian
+    } else if (std::endian::native == std::endian::big) {
+        return value;
+    } else {
+        LOG(FATAL) << "Unknown endianess";
+    }
+    __builtin_unreachable();
+}
+
+std::pair<int8_t*, int> load_file_to_memory(const std::string& path) {
+    int fd = open(path.c_str(), O_RDONLY);
+    int len = lseek(fd, 0, SEEK_END); // bytes
+
+    int8_t* addr = (int8_t*)mmap(nullptr, len, PROT_READ, MAP_PRIVATE, fd, 0);
+    int8_t* data = new int8_t[len];
+    memcpy(data, addr, len);
+    close(fd);
+    munmap(addr, len);
+
+    return {data, len};
+}
+
+struct alignas(alignof(uint8_t)) ttinfo {
+    uint8_t tt_utoff[4]; // need force cast to int32_t
+    uint8_t tt_isdst;
+    uint8_t tt_desigidx;
+};
+constexpr static int TTINFO_SIZE = sizeof(ttinfo);
+static_assert(TTINFO_SIZE == 6);
+
+struct real_ttinfo {
+    [[maybe_unused]] real_ttinfo() = default; // actually it's used. how 
stupid compiler!
+    real_ttinfo(const ttinfo& arg) {
+        diff_seconds = *reinterpret_cast<const int32_t*>(arg.tt_utoff + 0);
+        is_dst = arg.tt_isdst;
+        name_index = arg.tt_desigidx;
+    }
+
+    int32_t diff_seconds; // to UTC
+    bool is_dst;
+    uint8_t name_index;
+};
+
+template <>
+ttinfo next_from_charstream<ttinfo>(int8_t*& src) {
+    ttinfo value = *reinterpret_cast<ttinfo*>(src);
+    src += TTINFO_SIZE;
+    if constexpr (std::endian::native == std::endian::little) {
+        std::swap(value.tt_utoff[0], value.tt_utoff[3]);
+        std::swap(value.tt_utoff[1], value.tt_utoff[2]);
+    }
+    return value;
+}
+
+/* 
+ * follow the rule of tzfile(5) which defined in 
https://man7.org/linux/man-pages/man5/tzfile.5.html.
+ * should change when it changes.
+ */
+bool parse_load_timezone(vectorized::ZoneList& zone_list, int8_t* data, int 
len,
+                         bool first_time = true) {
+    int8_t* begin_pos = data;
+    /* HEADERS */
+    if (memcmp(data, "TZif", 4) != 0) [[unlikely]] { // magic number
+        return false;
+    }
+    data += 4;
+
+    // if version = 2, the whole header&data will repeat itself one time.
+    int8_t version = next_from_charstream<int8_t>(data) - '0';
+    data += 15; // null bits
+    int32_t ut_count = next_from_charstream<int32_t>(data);
+    int32_t wall_count = next_from_charstream<int32_t>(data);
+    int32_t leap_count = next_from_charstream<int32_t>(data);
+    int32_t trans_time_count = next_from_charstream<int32_t>(data);
+    int32_t type_count = next_from_charstream<int32_t>(data);
+    int32_t char_count = next_from_charstream<int32_t>(data);
+
+    /* HEADERS end, FIELDS begin*/
+    // transaction time points, which we don't need
+    data += (first_time ? 5 : 9) * trans_time_count;
+
+    // timezones
+    std::vector<real_ttinfo> timezones(type_count);
+    for (int i = 0; i < type_count; i++) {
+        ttinfo tz_data = next_from_charstream<ttinfo>(data);
+        timezones[i] = tz_data; // cast by c'tor
+    }
+
+    // timezone names
+    const char* name_zone = (char*)data;
+    data += char_count;
+
+    // concate names
+    for (auto& tz : timezones) {
+        int len = strlen(name_zone + tz.name_index);
+        zone_list.emplace(std::string {name_zone + tz.name_index, name_zone + 
tz.name_index + len},
+                          
cctz::fixed_time_zone(cctz::seconds(tz.diff_seconds)));
+    }
+
+    // the second part.
+    if (version == 2 && first_time) {
+        // leap seconds, standard/wall indicators, UT/local indicators, which 
we don't need
+        data += 4 * leap_count + wall_count + ut_count;
+
+        return (data < begin_pos + len) &&
+               parse_load_timezone(zone_list, data, len - (data - begin_pos), 
false);
+    }
+
+    return true;
+}
+
+} // namespace
+
+void TimezoneUtils::load_timezones_to_cache(vectorized::ZoneList& cache_list) {
+    cache_list["CST"] = cctz::fixed_time_zone(cctz::seconds(8 * 3600));
+
+    std::string base_str;
+    const char* tzdir = "/usr/share/zoneinfo"; // default
+    // try get from System
+    char* tzdir_env = std::getenv("TZDIR");
+    if (tzdir_env && *tzdir_env) {
+        tzdir = tzdir_env;
+    }
+
+    base_str += tzdir;
+    base_str += '/';
+
+    const auto root_path = std::filesystem::path {base_str};
+    std::set<std::string> ignore_paths = {"posix", "right"}; // duplications
+
+    for (std::filesystem::recursive_directory_iterator it {base_str}; it != 
end(it); it++) {
+        const auto& dir_entry = *it;
+        if (dir_entry.is_regular_file()) {
+            auto tz_name = relative(dir_entry, base_str);
+
+            auto tz_path = dir_entry.path().string();
+            auto [handle, length] = load_file_to_memory(tz_path);
+
+            parse_load_timezone(cache_list, handle, length);
+
+            delete[] handle;
+        } else if (dir_entry.is_directory() && 
ignore_paths.contains(dir_entry.path().filename())) {
+            it.disable_recursion_pending();
+        }
+    }
+
+    cache_list.erase("LMT"); // local mean time for every timezone
+    LOG(INFO) << "Read " << cache_list.size() << " timezones.";
+}
+
 bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone, 
cctz::time_zone& ctz) {
     auto timezone_lower = boost::algorithm::to_lower_copy(timezone);
     re2::StringPiece value;
@@ -121,6 +295,7 @@ bool TimezoneUtils::find_cctz_time_zone(const std::string& 
timezone, cctz::time_
         } else {
             auto it = timezone_names_map_.find(timezone_lower);
             if (it == timezone_names_map_.end()) {
+                VLOG_DEBUG << "Illegal timezone " << timezone_lower;
                 return false;
             }
             tz_parsed = cctz::load_time_zone(it->second, &ctz);
diff --git a/be/src/util/timezone_utils.h b/be/src/util/timezone_utils.h
index d9e5ee82d8..0f3a6dcc38 100644
--- a/be/src/util/timezone_utils.h
+++ b/be/src/util/timezone_utils.h
@@ -29,12 +29,16 @@ class time_zone;
 
 namespace doris {
 
+namespace vectorized {
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
+}
+
 class TimezoneUtils {
 public:
     static void load_timezone_names();
+    static void load_timezones_to_cache(vectorized::ZoneList& cache_list);
     static bool find_cctz_time_zone(const std::string& timezone, 
cctz::time_zone& ctz);
 
-public:
     static const std::string default_time_zone;
 
 private:
diff --git a/be/src/vec/functions/function_cast.h 
b/be/src/vec/functions/function_cast.h
index 1907ad5988..e0e7f8d0fe 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -43,6 +43,7 @@
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "udf/udf.h"
 #include "util/jsonb_document.h"
@@ -124,13 +125,13 @@ struct TimeCast {
     // '300' -> 00:03:00 '20:23' ->  20:23:00 '20:23:24' -> 20:23:24
     template <typename T>
     static bool try_parse_time(char* s, size_t len, T& x, const 
cctz::time_zone& local_time_zone,
-                               ZoneList& time_zone_cache) {
+                               ZoneList& time_zone_cache, std::shared_mutex& 
cache_lock) {
         /// TODO: Maybe we can move Timecast to the io_helper.
         if (try_as_time(s, len, x, local_time_zone)) {
             return true;
         } else {
             if (VecDateTimeValue dv {};
-                dv.from_date_str(s, len, local_time_zone, time_zone_cache)) {
+                dv.from_date_str(s, len, local_time_zone, time_zone_cache, 
&cache_lock)) {
                 // can be parse as a datetime
                 x = dv.hour() * 3600 + dv.minute() * 60 + dv.second();
                 return true;
@@ -852,6 +853,7 @@ struct NameToDateTime {
 template <typename DataType, typename Additions = void*, typename FromDataType 
= void*>
 bool try_parse_impl(typename DataType::FieldType& x, ReadBuffer& rb,
                     const cctz::time_zone& local_time_zone, ZoneList& 
time_zone_cache,
+                    std::shared_mutex& cache_lock,
                     Additions additions [[maybe_unused]] = Additions()) {
     if constexpr (IsDateTimeType<DataType>) {
         return try_read_datetime_text(x, rb, local_time_zone, time_zone_cache);
@@ -862,12 +864,13 @@ bool try_parse_impl(typename DataType::FieldType& x, 
ReadBuffer& rb,
     }
 
     if constexpr (IsDateV2Type<DataType>) {
-        return try_read_date_v2_text(x, rb, local_time_zone, time_zone_cache);
+        return try_read_date_v2_text(x, rb, local_time_zone, time_zone_cache, 
cache_lock);
     }
 
     if constexpr (IsDateTimeV2Type<DataType>) {
         UInt32 scale = additions;
-        return try_read_datetime_v2_text(x, rb, local_time_zone, 
time_zone_cache, scale);
+        return try_read_datetime_v2_text(x, rb, local_time_zone, 
time_zone_cache, cache_lock,
+                                         scale);
     }
 
     if constexpr (std::is_same_v<DataTypeString, FromDataType> &&
@@ -876,7 +879,8 @@ bool try_parse_impl(typename DataType::FieldType& x, 
ReadBuffer& rb,
         auto len = rb.count();
         auto s = rb.position();
         rb.position() = rb.end(); // make is_all_read = true
-        auto ret = TimeCast::try_parse_time(s, len, x, local_time_zone, 
time_zone_cache);
+        auto ret =
+                TimeCast::try_parse_time(s, len, x, local_time_zone, 
time_zone_cache, cache_lock);
         x *= (1000 * 1000);
         return ret;
     }
@@ -1326,10 +1330,8 @@ struct ConvertThroughParsing {
                                             ColumnDecimal<ToFieldType>, 
ColumnVector<ToFieldType>>;
 
         // For datelike type, only from FunctionConvertFromString. So we can 
use its' context。
-        auto convert_ctx = reinterpret_cast<ConvertTzCtx*>(
-                
context->get_function_state(FunctionContext::FunctionStateScope::THREAD_LOCAL));
-        ZoneList time_zone_cache_;
-        auto& time_zone_cache = convert_ctx ? convert_ctx->time_zone_cache : 
time_zone_cache_;
+        ZoneList& time_zone_cache = 
context->state()->exec_env()->global_zone_cache();
+        std::shared_mutex& cache_lock = 
context->state()->exec_env()->zone_cache_rw_lock();
 
         const IColumn* col_from = 
block.get_by_position(arguments[0]).column.get();
         const ColumnString* col_from_string = 
check_and_get_column<ColumnString>(col_from);
@@ -1379,18 +1381,19 @@ struct ConvertThroughParsing {
 
             bool parsed;
             if constexpr (IsDataTypeDecimal<ToDataType>) {
-                parsed = try_parse_impl<ToDataType>(vec_to[i], read_buffer,
-                                                    
context->state()->timezone_obj(),
-                                                    time_zone_cache, 
vec_to.get_scale());
+                parsed = try_parse_impl<ToDataType>(
+                        vec_to[i], read_buffer, 
context->state()->timezone_obj(), time_zone_cache,
+                        cache_lock, vec_to.get_scale());
             } else if constexpr (IsDataTypeDateTimeV2<ToDataType>) {
                 auto type = check_and_get_data_type<DataTypeDateTimeV2>(
                         block.get_by_position(result).type.get());
                 parsed = try_parse_impl<ToDataType>(vec_to[i], read_buffer,
                                                     
context->state()->timezone_obj(),
-                                                    time_zone_cache, 
type->get_scale());
+                                                    time_zone_cache, 
cache_lock, type->get_scale());
             } else {
                 parsed = try_parse_impl<ToDataType, void*, FromDataType>(
-                        vec_to[i], read_buffer, 
context->state()->timezone_obj(), time_zone_cache);
+                        vec_to[i], read_buffer, 
context->state()->timezone_obj(), time_zone_cache,
+                        cache_lock);
             }
             (*vec_null_map_to)[i] = !parsed || !is_all_read(read_buffer);
             current_offset = next_offset;
@@ -1427,14 +1430,6 @@ public:
 
     ColumnNumbers get_arguments_that_are_always_constant() const override { 
return {1}; }
 
-    Status open(FunctionContext* context, FunctionContext::FunctionStateScope 
scope) override {
-        if (scope != FunctionContext::THREAD_LOCAL) {
-            return Status::OK();
-        }
-        context->set_function_state(scope, std::make_unique<ConvertTzCtx>());
-        return Status::OK();
-    }
-
     // This function should not be called for get DateType Ptr
     // using the FunctionCast::get_return_type_impl
     DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) 
const override {
@@ -1442,8 +1437,9 @@ public:
         if constexpr (IsDataTypeDecimal<ToDataType>) {
             LOG(FATAL) << "Someting wrong with toDecimalNNOrZero() or 
toDecimalNNOrNull()";
 
-        } else
+        } else {
             res = std::make_shared<ToDataType>();
+        }
 
         return res;
     }
diff --git a/be/src/vec/functions/function_convert_tz.h 
b/be/src/vec/functions/function_convert_tz.h
index 7c6d0be442..8ff3505aca 100644
--- a/be/src/vec/functions/function_convert_tz.h
+++ b/be/src/vec/functions/function_convert_tz.h
@@ -23,11 +23,15 @@
 
 #include <map>
 #include <memory>
+#include <mutex>
+#include <shared_mutex>
 #include <string>
 #include <type_traits>
 #include <utility>
 
 #include "common/status.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
 #include "udf/udf.h"
 #include "util/binary_cast.hpp"
 #include "util/timezone_utils.h"
@@ -65,9 +69,7 @@ class DateV2Value;
 
 namespace doris::vectorized {
 
-struct ConvertTzCtx {
-    ZoneList time_zone_cache;
-};
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
 
 template <typename DateValueType, typename ArgType>
 struct ConvertTZImpl {
@@ -90,10 +92,8 @@ struct ConvertTZImpl {
                         const ColumnString* from_tz_column, const 
ColumnString* to_tz_column,
                         ReturnColumnType* result_column, NullMap& 
result_null_map,
                         size_t input_rows_count) {
-        auto convert_ctx = reinterpret_cast<ConvertTzCtx*>(
-                
context->get_function_state(FunctionContext::FunctionStateScope::THREAD_LOCAL));
-        ZoneList time_zone_cache_;
-        auto& time_zone_cache = convert_ctx ? convert_ctx->time_zone_cache : 
time_zone_cache_;
+        ZoneList& time_zone_cache = 
context->state()->exec_env()->global_zone_cache();
+        std::shared_mutex& cache_lock = 
context->state()->exec_env()->zone_cache_rw_lock();
         for (size_t i = 0; i < input_rows_count; i++) {
             if (result_null_map[i]) {
                 result_column->insert_default();
@@ -101,8 +101,8 @@ struct ConvertTZImpl {
             }
             auto from_tz = from_tz_column->get_data_at(i).to_string();
             auto to_tz = to_tz_column->get_data_at(i).to_string();
-            execute_inner_loop(date_column, time_zone_cache, from_tz, to_tz, 
result_column,
-                               result_null_map, i);
+            execute_inner_loop(date_column, time_zone_cache, cache_lock, 
from_tz, to_tz,
+                               result_column, result_null_map, i);
         }
     }
 
@@ -110,10 +110,8 @@ struct ConvertTZImpl {
                                  const ColumnString* from_tz_column,
                                  const ColumnString* to_tz_column, 
ReturnColumnType* result_column,
                                  NullMap& result_null_map, size_t 
input_rows_count) {
-        auto convert_ctx = reinterpret_cast<ConvertTzCtx*>(
-                
context->get_function_state(FunctionContext::FunctionStateScope::THREAD_LOCAL));
-        ZoneList time_zone_cache_;
-        auto& time_zone_cache = convert_ctx ? convert_ctx->time_zone_cache : 
time_zone_cache_;
+        ZoneList& time_zone_cache = 
context->state()->exec_env()->global_zone_cache();
+        std::shared_mutex& cache_lock = 
context->state()->exec_env()->zone_cache_rw_lock();
 
         auto from_tz = from_tz_column->get_data_at(0).to_string();
         auto to_tz = to_tz_column->get_data_at(0).to_string();
@@ -122,33 +120,46 @@ struct ConvertTZImpl {
                 result_column->insert_default();
                 continue;
             }
-            execute_inner_loop(date_column, time_zone_cache, from_tz, to_tz, 
result_column,
-                               result_null_map, i);
+            execute_inner_loop(date_column, time_zone_cache, cache_lock, 
from_tz, to_tz,
+                               result_column, result_null_map, i);
         }
     }
 
     static void execute_inner_loop(const ColumnType* date_column, ZoneList& 
time_zone_cache,
-                                   const std::string& from_tz, const 
std::string& to_tz,
-                                   ReturnColumnType* result_column, NullMap& 
result_null_map,
-                                   const size_t index_now) {
+                                   std::shared_mutex& cache_lock, const 
std::string& from_tz,
+                                   const std::string& to_tz, ReturnColumnType* 
result_column,
+                                   NullMap& result_null_map, const size_t 
index_now) {
         DateValueType ts_value =
                 binary_cast<NativeType, 
DateValueType>(date_column->get_element(index_now));
         int64_t timestamp;
 
+        cache_lock.lock_shared();
         if (time_zone_cache.find(from_tz) == time_zone_cache.cend()) {
+            cache_lock.unlock_shared();
+            std::unique_lock<std::shared_mutex> lock_(cache_lock);
+            //TODO: the lock upgrade could be done in find_... function only 
when we push value into the hashmap
             if (!TimezoneUtils::find_cctz_time_zone(from_tz, 
time_zone_cache[from_tz])) {
+                time_zone_cache.erase(to_tz);
                 result_null_map[index_now] = true;
                 result_column->insert_default();
                 return;
             }
+        } else {
+            cache_lock.unlock_shared();
         }
 
+        cache_lock.lock_shared();
         if (time_zone_cache.find(to_tz) == time_zone_cache.cend()) {
+            cache_lock.unlock_shared();
+            std::unique_lock<std::shared_mutex> lock_(cache_lock);
             if (!TimezoneUtils::find_cctz_time_zone(to_tz, 
time_zone_cache[to_tz])) {
+                time_zone_cache.erase(to_tz);
                 result_null_map[index_now] = true;
                 result_column->insert_default();
                 return;
             }
+        } else {
+            cache_lock.unlock_shared();
         }
 
         if (!ts_value.unix_timestamp(&timestamp, time_zone_cache[from_tz])) {
@@ -201,14 +212,6 @@ public:
 
     bool use_default_implementation_for_nulls() const override { return false; 
}
 
-    Status open(FunctionContext* context, FunctionContext::FunctionStateScope 
scope) override {
-        if (scope != FunctionContext::THREAD_LOCAL) {
-            return Status::OK();
-        }
-        context->set_function_state(scope, std::make_unique<ConvertTzCtx>());
-        return Status::OK();
-    }
-
     Status close(FunctionContext* context, FunctionContext::FunctionStateScope 
scope) override {
         return Status::OK();
     }
diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h
index 4d8b69d1fd..547277bcc4 100644
--- a/be/src/vec/io/io_helper.h
+++ b/be/src/vec/io/io_helper.h
@@ -21,6 +21,7 @@
 #include <snappy/snappy.h>
 
 #include <iostream>
+#include <unordered_map>
 
 #include "common/exception.h"
 #include "util/binary_cast.hpp"
@@ -42,7 +43,7 @@ static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824; 
// 1GB
 static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824;   // 1GB
 static constexpr auto WRITE_HELPERS_MAX_INT_WIDTH = 40U;
 
-using ZoneList = std::map<std::string, cctz::time_zone>;
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
 
 inline std::string int128_to_string(__int128_t value) {
     fmt::memory_buffer buffer;
@@ -337,10 +338,11 @@ bool read_date_v2_text_impl(T& x, ReadBuffer& buf) {
 
 template <typename T>
 bool read_date_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& 
local_time_zone,
-                            ZoneList& time_zone_cache) {
+                            ZoneList& time_zone_cache, std::shared_mutex& 
cache_lock) {
     static_assert(std::is_same_v<UInt32, T>);
     auto dv = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(x);
-    auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, 
time_zone_cache);
+    auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, 
time_zone_cache,
+                                &cache_lock);
 
     // only to match the is_all_read() check to prevent return null
     buf.position() = buf.end();
@@ -362,11 +364,12 @@ bool read_datetime_v2_text_impl(T& x, ReadBuffer& buf, 
UInt32 scale = -1) {
 
 template <typename T>
 bool read_datetime_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone& 
local_time_zone,
-                                ZoneList& time_zone_cache, UInt32 scale = -1) {
+                                ZoneList& time_zone_cache, std::shared_mutex& 
cache_lock,
+                                UInt32 scale = -1) {
     static_assert(std::is_same_v<UInt64, T>);
     auto dv = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(x);
-    auto ans =
-            dv.from_date_str(buf.position(), buf.count(), local_time_zone, 
time_zone_cache, scale);
+    auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone, 
time_zone_cache,
+                                &cache_lock, scale);
 
     // only to match the is_all_read() check to prevent return null
     buf.position() = buf.end();
@@ -463,13 +466,15 @@ bool try_read_date_text(T& x, ReadBuffer& in, const 
cctz::time_zone& local_time_
 
 template <typename T>
 bool try_read_date_v2_text(T& x, ReadBuffer& in, const cctz::time_zone& 
local_time_zone,
-                           ZoneList& time_zone_cache) {
-    return read_date_v2_text_impl<T>(x, in, local_time_zone, time_zone_cache);
+                           ZoneList& time_zone_cache, std::shared_mutex& 
cache_lock) {
+    return read_date_v2_text_impl<T>(x, in, local_time_zone, time_zone_cache, 
cache_lock);
 }
 
 template <typename T>
 bool try_read_datetime_v2_text(T& x, ReadBuffer& in, const cctz::time_zone& 
local_time_zone,
-                               ZoneList& time_zone_cache, UInt32 scale) {
-    return read_datetime_v2_text_impl<T>(x, in, local_time_zone, 
time_zone_cache, scale);
+                               ZoneList& time_zone_cache, std::shared_mutex& 
cache_lock,
+                               UInt32 scale) {
+    return read_datetime_v2_text_impl<T>(x, in, local_time_zone, 
time_zone_cache, cache_lock,
+                                         scale);
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vdatetime_value.cpp 
b/be/src/vec/runtime/vdatetime_value.cpp
index f701e96f30..86d685a0b9 100644
--- a/be/src/vec/runtime/vdatetime_value.cpp
+++ b/be/src/vec/runtime/vdatetime_value.cpp
@@ -85,18 +85,19 @@ bool VecDateTimeValue::check_date(uint32_t year, uint32_t 
month, uint32_t day) {
 // YYYY-MM-DD HH-MM-DD.FFFFFF AM in default format
 // 0    1  2  3  4  5  6      7
 bool VecDateTimeValue::from_date_str(const char* date_str, int len) {
-    return from_date_str_base(date_str, len, nullptr, nullptr);
+    return from_date_str_base(date_str, len, nullptr, nullptr, nullptr);
 }
 //parse timezone to get offset
 bool VecDateTimeValue::from_date_str(const char* date_str, int len,
                                      const cctz::time_zone& local_time_zone,
-                                     ZoneList& time_zone_cache) {
-    return from_date_str_base(date_str, len, &local_time_zone, 
&time_zone_cache);
+                                     ZoneList& time_zone_cache, 
std::shared_mutex* cache_lock) {
+    return from_date_str_base(date_str, len, &local_time_zone, 
&time_zone_cache, cache_lock);
 }
 
 bool VecDateTimeValue::from_date_str_base(const char* date_str, int len,
                                           const cctz::time_zone* 
local_time_zone,
-                                          ZoneList* time_zone_cache) {
+                                          ZoneList* time_zone_cache,
+                                          std::shared_mutex* cache_lock) {
     const char* ptr = date_str;
     const char* end = date_str + len;
     // ONLY 2, 6 can follow by a space
@@ -165,13 +166,18 @@ bool VecDateTimeValue::from_date_str_base(const char* 
date_str, int len,
                 return false;
             }
             auto get_tz_offset = [&](const std::string& str_tz,
-                                     const cctz::time_zone* local_time_zone,
-                                     ZoneList* time_zone_cache) -> long {
-                // no lock needed because of the entity is of thread_local
+                                     const cctz::time_zone* local_time_zone) 
-> long {
+                cache_lock->lock_shared();
                 if (time_zone_cache->find(str_tz) == time_zone_cache->end()) { 
// not found
+                    cache_lock->unlock_shared();
+                    std::unique_lock<std::shared_mutex> lock_(*cache_lock);
+                    //TODO: the lock upgrade could be done in find_... 
function only when we push value into the hashmap
                     if (!TimezoneUtils::find_cctz_time_zone(str_tz, 
(*time_zone_cache)[str_tz])) {
+                        time_zone_cache->erase(str_tz);
                         throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
                     }
+                } else {
+                    cache_lock->unlock_shared();
                 }
                 auto given = cctz::convert(cctz::civil_second {}, 
(*time_zone_cache)[str_tz]);
                 auto local = cctz::convert(cctz::civil_second {}, 
*local_time_zone);
@@ -179,8 +185,8 @@ bool VecDateTimeValue::from_date_str_base(const char* 
date_str, int len,
                 return std::chrono::duration_cast<std::chrono::seconds>(given 
- local).count();
             };
             try {
-                sec_offset = get_tz_offset(std::string {ptr, end}, 
local_time_zone,
-                                           time_zone_cache); // use the whole 
remain string
+                sec_offset = get_tz_offset(std::string {ptr, end},
+                                           local_time_zone); // use the whole 
remain string
             } catch ([[maybe_unused]] Exception& e) {
                 return false; // invalid format
             }
@@ -1951,19 +1957,20 @@ void DateV2Value<T>::format_datetime(uint32_t* 
date_val, bool* carry_bits) const
 // 0    1  2  3  4  5  6      7
 template <typename T>
 bool DateV2Value<T>::from_date_str(const char* date_str, int len, int scale /* 
= -1*/) {
-    return from_date_str_base(date_str, len, scale, nullptr, nullptr);
+    return from_date_str_base(date_str, len, scale, nullptr, nullptr, nullptr);
 }
 // when we parse
 template <typename T>
 bool DateV2Value<T>::from_date_str(const char* date_str, int len,
                                    const cctz::time_zone& local_time_zone,
-                                   ZoneList& time_zone_cache, int scale /* = 
-1*/) {
-    return from_date_str_base(date_str, len, scale, &local_time_zone, 
&time_zone_cache);
+                                   ZoneList& time_zone_cache, 
std::shared_mutex* cache_lock,
+                                   int scale /* = -1*/) {
+    return from_date_str_base(date_str, len, scale, &local_time_zone, 
&time_zone_cache, cache_lock);
 }
 template <typename T>
 bool DateV2Value<T>::from_date_str_base(const char* date_str, int len, int 
scale,
                                         const cctz::time_zone* local_time_zone,
-                                        ZoneList* time_zone_cache) {
+                                        ZoneList* time_zone_cache, 
std::shared_mutex* cache_lock) {
     const char* ptr = date_str;
     const char* end = date_str + len;
     // ONLY 2, 6 can follow by a space
@@ -2059,13 +2066,18 @@ bool DateV2Value<T>::from_date_str_base(const char* 
date_str, int len, int scale
                 return false;
             }
             auto get_tz_offset = [&](const std::string& str_tz,
-                                     const cctz::time_zone* local_time_zone,
-                                     ZoneList* time_zone_cache) -> long {
-                // no lock needed because of the entity is of thread_local
+                                     const cctz::time_zone* local_time_zone) 
-> long {
+                cache_lock->lock_shared();
                 if (time_zone_cache->find(str_tz) == time_zone_cache->end()) { 
// not found
+                    cache_lock->unlock_shared();
+                    std::unique_lock<std::shared_mutex> lock_(*cache_lock);
+                    //TODO: the lock upgrade could be done in find_... 
function only when we push value into the hashmap
                     if (!TimezoneUtils::find_cctz_time_zone(str_tz, 
(*time_zone_cache)[str_tz])) {
+                        time_zone_cache->erase(str_tz);
                         throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
                     }
+                } else {
+                    cache_lock->unlock_shared();
                 }
                 auto given = cctz::convert(cctz::civil_second {}, 
(*time_zone_cache)[str_tz]);
                 auto local = cctz::convert(cctz::civil_second {}, 
*local_time_zone);
@@ -2073,8 +2085,8 @@ bool DateV2Value<T>::from_date_str_base(const char* 
date_str, int len, int scale
                 return std::chrono::duration_cast<std::chrono::seconds>(given 
- local).count();
             };
             try {
-                sec_offset = get_tz_offset(std::string {ptr, end}, 
local_time_zone,
-                                           time_zone_cache); // use the whole 
remain string
+                sec_offset = get_tz_offset(std::string {ptr, end},
+                                           local_time_zone); // use the whole 
remain string
             } catch ([[maybe_unused]] Exception& e) {
                 return false; // invalid format
             }
diff --git a/be/src/vec/runtime/vdatetime_value.h 
b/be/src/vec/runtime/vdatetime_value.h
index fa3638fb94..68b1b1ad58 100644
--- a/be/src/vec/runtime/vdatetime_value.h
+++ b/be/src/vec/runtime/vdatetime_value.h
@@ -34,6 +34,7 @@
 #include "util/hash_util.hpp"
 #include "util/time_lut.h"
 #include "util/timezone_utils.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
 
 namespace cctz {
 class time_zone;
@@ -43,7 +44,7 @@ namespace doris {
 
 namespace vectorized {
 
-using ZoneList = std::map<std::string, cctz::time_zone>;
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
 
 enum TimeUnit {
     MICROSECOND,
@@ -354,7 +355,7 @@ public:
     // 'YYYYMMDDTHHMMSS'
     bool from_date_str(const char* str, int len);
     bool from_date_str(const char* str, int len, const cctz::time_zone& 
local_time_zone,
-                       ZoneList& time_zone_cache);
+                       ZoneList& time_zone_cache, std::shared_mutex* 
cache_lock);
 
     // Construct Date/Datetime type value from int64_t value.
     // Return true if convert success. Otherwise return false.
@@ -693,7 +694,7 @@ private:
     char* to_time_buffer(char* to) const;
 
     bool from_date_str_base(const char* date_str, int len, const 
cctz::time_zone* local_time_zone,
-                            ZoneList* time_zone_cache);
+                            ZoneList* time_zone_cache, std::shared_mutex* 
cache_lock);
 
     int64_t to_date_int64() const;
     int64_t to_time_int64() const;
@@ -815,7 +816,7 @@ public:
     // 'YYYYMMDDTHHMMSS'
     bool from_date_str(const char* str, int len, int scale = -1);
     bool from_date_str(const char* str, int len, const cctz::time_zone& 
local_time_zone,
-                       ZoneList& time_zone_cache, int scale = -1);
+                       ZoneList& time_zone_cache, std::shared_mutex* 
cache_lock, int scale = -1);
 
     // Convert this value to string
     // this will check type to decide which format to convert
@@ -1179,7 +1180,8 @@ private:
                              bool disable_lut = false);
 
     bool from_date_str_base(const char* date_str, int len, int scale,
-                            const cctz::time_zone* local_time_zone, ZoneList* 
time_zone_cache);
+                            const cctz::time_zone* local_time_zone, ZoneList* 
time_zone_cache,
+                            std::shared_mutex* cache_lock);
 
     // Used to construct from int value
     int64_t standardize_timevalue(int64_t value);
diff --git a/be/test/vec/function/function_time_test.cpp 
b/be/test/vec/function/function_time_test.cpp
index a24ab3f35c..e6281bc8f4 100644
--- a/be/test/vec/function/function_time_test.cpp
+++ b/be/test/vec/function/function_time_test.cpp
@@ -539,22 +539,6 @@ TEST(VTimestampFunctionsTest, makedate_test) {
     check_function<DataTypeDate, true>(func_name, input_types, data_set);
 }
 
-TEST(VTimestampFunctionsTest, convert_tz_test) {
-    TimezoneUtils::load_timezone_names();
-    std::string func_name = "convert_tz";
-
-    InputTypeSet input_types = {TypeIndex::DateTime, TypeIndex::String, 
TypeIndex::String};
-
-    DataSet data_set = {
-            {{DATETIME("2019-08-01 13:21:03"), STRING("Asia/Shanghai"),
-              STRING("America/Los_Angeles")},
-             str_to_date_time("2019-07-31 22:21:03", true)},
-            {{DATETIME("2019-08-01 13:21:03"), STRING("+08:00"), 
STRING("America/Los_Angeles")},
-             str_to_date_time("2019-07-31 22:21:03", true)}};
-
-    check_function<DataTypeDateTime, true>(func_name, input_types, data_set);
-}
-
 TEST(VTimestampFunctionsTest, weekday_test) {
     std::string func_name = "weekday";
 
@@ -1680,20 +1664,4 @@ TEST(VTimestampFunctionsTest, seconds_sub_v2_test) {
     }
 }
 
-TEST(VTimestampFunctionsTest, convert_tz_v2_test) {
-    TimezoneUtils::load_timezone_names();
-    std::string func_name = "convert_tz";
-
-    InputTypeSet input_types = {TypeIndex::DateTimeV2, TypeIndex::String, 
TypeIndex::String};
-
-    DataSet data_set = {
-            {{DATETIME("2019-08-01 13:21:03"), STRING("Asia/Shanghai"),
-              STRING("America/Los_Angeles")},
-             str_to_datetime_v2("2019-07-31 22:21:03", "%Y-%m-%d 
%H:%i:%s.%f")},
-            {{DATETIME("2019-08-01 13:21:03"), STRING("+08:00"), 
STRING("America/Los_Angeles")},
-             str_to_datetime_v2("2019-07-31 22:21:03", "%Y-%m-%d 
%H:%i:%s.%f")}};
-
-    check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set);
-}
-
 } // namespace doris::vectorized
diff --git 
a/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy 
b/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
index 359f432bd5..6e32facc83 100644
--- a/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
+++ b/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
@@ -51,7 +51,7 @@ suite("test_tz_streamload") {
     streamLoad {
         table "${table1}"
         set 'column_separator', ','
-        set 'time_zone', '+02:00'
+        set 'timezone', '+02:00'
         file "test_tz_streamload.csv"
         time 20000
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to