This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new cd20617a1d3 [fix](be) Backport selected query engine fixes to
branch-4.1 (#64537)
cd20617a1d3 is described below
commit cd20617a1d339df561cb56f06f782a6d84581849
Author: Mryange <[email protected]>
AuthorDate: Sat Jun 20 22:03:40 2026 +0800
[fix](be) Backport selected query engine fixes to branch-4.1 (#64537)
Related PR:
- https://github.com/apache/doris/pull/63076
- https://github.com/apache/doris/pull/63810
- https://github.com/apache/doris/pull/63174
- https://github.com/apache/doris/pull/63827
- https://github.com/apache/doris/pull/63109
- https://github.com/apache/doris/pull/63356
- https://github.com/apache/doris/pull/63124
- https://github.com/apache/doris/pull/63365
Problem Summary:
This PR backports a clean batch of branch-4.1-compatible fixes and
refinements from master. The picked changes cover constant argument
handling in scalar functions, mixed const-column regressions, aggregate
state cleanup, TIMESTAMPTZ group_array serde, annotated mutex wrappers,
dry-run result serialization, TIMESTAMPTZ map_contains_entry, and
TIMESTAMPDIFF MICROSECOND support in Nereids.
All selected commits were cherry-picked with `-x` from master onto
`upstream/branch-4.1` in the order documented by the pick plan. The ANN
typed query vector PR was reverted from this batch because it depends on
missing COW typed column cast helpers in branch-4.1. The sort merge
fast-path change was also reverted because it changed branch-4.1
behavior in the inline-view window-function regression path.
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/thread_safety_annotations.h | 48 ++++++
be/src/core/column/column_const.h | 3 +-
be/src/exec/exchange/vdata_stream_mgr.cpp | 28 ++--
be/src/exec/exchange/vdata_stream_mgr.h | 14 +-
be/src/exec/runtime_filter/runtime_filter_mgr.cpp | 20 +--
be/src/exec/runtime_filter/runtime_filter_mgr.h | 9 +-
be/src/exec/sink/writer/vmysql_result_writer.cpp | 6 +
be/src/exprs/aggregate/aggregate_function.h | 54 ++++---
.../exprs/aggregate/aggregate_function_collect.h | 11 +-
.../exprs/aggregate/aggregate_function_distinct.h | 8 +-
be/src/exprs/aggregate/aggregate_function_map.h | 2 +-
be/src/exprs/aggregate/aggregate_function_map_v2.h | 2 +-
be/src/exprs/function/function_map.cpp | 135 ++---------------
be/src/exprs/function/function_quantile_state.cpp | 2 +
be/src/exprs/function/function_regexp.cpp | 20 ++-
be/src/exprs/function/uniform.cpp | 16 +-
be/test/core/column/column_const_test.cpp | 13 ++
.../data_type_serde/data_type_serde_mysql_test.cpp | 23 ++-
be/test/exec/pipeline/vdata_stream_recvr_test.cpp | 2 +-
.../aggregate_function_exception_test.cpp | 162 +++++++++++++++++++++
be/test/exprs/function/function_math_test.cpp | 58 ++++++++
.../function/function_quantile_state_test.cpp | 17 +++
be/test/exprs/function/function_string_test.cpp | 16 ++
.../rules/analysis/DatetimeFunctionBinder.java | 5 +-
.../trees/expressions/literal/Interval.java | 1 +
.../rules/analysis/DatetimeFunctionBinderTest.java | 10 ++
.../timestamptz/test_timestamptz_agg_functions.out | 3 +
.../test_timestamptz_map_contains_entry.out | 43 ++++++
.../data/nereids_syntax_p0/test_timestampdiff.out | 6 +
.../test_timestamptz_agg_functions.groovy | 37 +++++
.../test_timestamptz_map_contains_entry.groovy | 155 ++++++++++++++++++++
.../nereids_function_p0/scalar_function/U.groovy | 2 +
.../nereids_syntax_p0/test_timestampdiff.groovy | 28 ++++
33 files changed, 756 insertions(+), 203 deletions(-)
diff --git a/be/src/common/thread_safety_annotations.h
b/be/src/common/thread_safety_annotations.h
index 6cd8d4b0cae..6bbdb8ce654 100644
--- a/be/src/common/thread_safety_annotations.h
+++ b/be/src/common/thread_safety_annotations.h
@@ -22,6 +22,7 @@
#pragma once
#include <mutex>
+#include <shared_mutex>
#ifdef BE_TEST
namespace doris {
@@ -93,6 +94,27 @@ private:
std::mutex _mutex;
};
+// Annotated shared mutex wrapper for use with Clang thread safety analysis.
+// Wraps std::shared_mutex and provides both exclusive and shared capability
+// operations so GUARDED_BY / REQUIRES_SHARED / etc. can reference it.
+class CAPABILITY("mutex") AnnotatedSharedMutex {
+public:
+ void lock() ACQUIRE() { _mutex.lock(); }
+ void unlock() RELEASE() { _mutex.unlock(); }
+ bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); }
+
+ void lock_shared() ACQUIRE_SHARED() { _mutex.lock_shared(); }
+ void unlock_shared() RELEASE_SHARED() { _mutex.unlock_shared(); }
+ bool try_lock_shared() TRY_ACQUIRE_SHARED(true) { return
_mutex.try_lock_shared(); }
+
+ // Access the underlying std::shared_mutex (e.g., for
std::condition_variable_any).
+ // Use with care — this bypasses thread safety annotations.
+ std::shared_mutex& native_handle() { return _mutex; }
+
+private:
+ std::shared_mutex _mutex;
+};
+
// RAII scoped lock guard annotated for thread safety analysis.
// In BE_TEST builds, injects a random sleep before acquiring and after
// releasing the lock to exercise concurrent code paths.
@@ -119,6 +141,32 @@ private:
MutexType& _mu;
};
+// RAII scoped shared lock guard annotated for thread safety analysis.
+// In BE_TEST builds, injects a random sleep before acquiring and after
+// releasing the lock to exercise concurrent code paths.
+template <typename MutexType>
+class SCOPED_CAPABILITY SharedLockGuard {
+public:
+ explicit SharedLockGuard(MutexType& mu) ACQUIRE_SHARED(mu) : _mu(mu) {
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ _mu.lock_shared();
+ }
+ ~SharedLockGuard() RELEASE() {
+ _mu.unlock_shared();
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ }
+
+ SharedLockGuard(const SharedLockGuard&) = delete;
+ SharedLockGuard& operator=(const SharedLockGuard&) = delete;
+
+private:
+ MutexType& _mu;
+};
+
// RAII unique lock annotated for thread safety analysis.
// Supports manual lock/unlock while preserving capability tracking.
template <typename MutexType>
diff --git a/be/src/core/column/column_const.h
b/be/src/core/column/column_const.h
index 1d0a0d7e596..cf26588a6a5 100644
--- a/be/src/core/column/column_const.h
+++ b/be/src/core/column/column_const.h
@@ -126,7 +126,8 @@ public:
void resize(size_t new_size) override { s = new_size; }
MutableColumnPtr clone_resized(size_t new_size) const override {
- return ColumnConst::create(data, new_size, false, false);
+ auto cloned_data = data->clone_resized(data->size());
+ return ColumnConst::create(std::move(cloned_data), new_size, false,
false);
}
size_t size() const override { return s; }
diff --git a/be/src/exec/exchange/vdata_stream_mgr.cpp
b/be/src/exec/exchange/vdata_stream_mgr.cpp
index 17bab298c43..b3357f8d0b6 100644
--- a/be/src/exec/exchange/vdata_stream_mgr.cpp
+++ b/be/src/exec/exchange/vdata_stream_mgr.cpp
@@ -44,7 +44,7 @@ VDataStreamMgr::~VDataStreamMgr() {
// It will core during graceful stop.
auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
{
- std::shared_lock l(_lock);
+ SharedLockGuard l(_lock);
auto receiver_iterator = _receiver_map.begin();
while (receiver_iterator != _receiver_map.end()) {
// Could not call close directly, because during close method, it
will remove itself
@@ -77,22 +77,16 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::create_recvr(
this, memory_used_counter, state, fragment_instance_id,
dest_node_id, num_senders,
is_merging, profile, data_queue_capacity));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
- std::unique_lock l(_lock);
+ LockGuard l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
_receiver_map.insert(std::make_pair(hash_value, recvr));
return recvr;
}
-Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id,
PlanNodeId node_id,
- std::shared_ptr<VDataStreamRecvr>* res, bool
acquire_lock) {
+Status VDataStreamMgr::_find_recvr(uint32_t hash_value, const TUniqueId&
fragment_instance_id,
+ PlanNodeId node_id,
std::shared_ptr<VDataStreamRecvr>* res) {
VLOG_ROW << "looking up fragment_instance_id=" <<
print_id(fragment_instance_id)
<< ", node=" << node_id;
- uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
- // Create lock guard and not own lock currently and will lock conditionally
- std::shared_lock recvr_lock(_lock, std::defer_lock);
- if (acquire_lock) {
- recvr_lock.lock();
- }
std::pair<StreamMap::iterator, StreamMap::iterator> range =
_receiver_map.equal_range(hash_value);
while (range.first != range.second) {
@@ -108,6 +102,13 @@ Status VDataStreamMgr::find_recvr(const TUniqueId&
fragment_instance_id, PlanNod
node_id, print_id(fragment_instance_id));
}
+Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id,
PlanNodeId node_id,
+ std::shared_ptr<VDataStreamRecvr>* res) {
+ SharedLockGuard recvr_lock(_lock);
+ uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
+ return _find_recvr(hash_value, fragment_instance_id, node_id, res);
+}
+
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
::google::protobuf::Closure** done,
const int64_t wait_for_worker) {
@@ -199,7 +200,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId&
fragment_instance_id, P
<< ", node=" << node_id;
uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
{
- std::unique_lock l(_lock);
+ LockGuard l(_lock);
auto range = _receiver_map.equal_range(hash_value);
while (range.first != range.second) {
const std::shared_ptr<VDataStreamRecvr>& recvr =
range.first->second;
@@ -230,12 +231,13 @@ void VDataStreamMgr::cancel(const TUniqueId&
fragment_instance_id, Status exec_s
VLOG_QUERY << "cancelling all streams for fragment=" <<
print_id(fragment_instance_id);
std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
{
- std::shared_lock l(_lock);
+ SharedLockGuard l(_lock);
FragmentStreamSet::iterator i =
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
while (i != _fragment_stream_set.end() && i->first ==
fragment_instance_id) {
std::shared_ptr<VDataStreamRecvr> recvr;
- WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
+ uint32_t hash_value = get_hash_value(i->first, i->second);
+ WARN_IF_ERROR(_find_recvr(hash_value, i->first, i->second,
&recvr), "");
if (recvr == nullptr) {
// keep going but at least log it
std::stringstream err;
diff --git a/be/src/exec/exchange/vdata_stream_mgr.h
b/be/src/exec/exchange/vdata_stream_mgr.h
index 7bde8f3b4c0..7f35d627202 100644
--- a/be/src/exec/exchange/vdata_stream_mgr.h
+++ b/be/src/exec/exchange/vdata_stream_mgr.h
@@ -30,6 +30,7 @@
#include "common/be_mock_util.h"
#include "common/global_types.h"
#include "common/status.h"
+#include "common/thread_safety_annotations.h"
#include "runtime/runtime_profile.h"
namespace google {
@@ -58,8 +59,7 @@ public:
RuntimeProfile* profile, bool is_merging, size_t
data_queue_capacity);
MOCK_FUNCTION Status find_recvr(const TUniqueId& fragment_instance_id,
PlanNodeId node_id,
- std::shared_ptr<VDataStreamRecvr>* res,
- bool acquire_lock = true);
+ std::shared_ptr<VDataStreamRecvr>* res);
Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId
node_id);
@@ -69,9 +69,9 @@ public:
void cancel(const TUniqueId& fragment_instance_id, Status exec_status);
private:
- std::shared_mutex _lock;
+ AnnotatedSharedMutex _lock;
using StreamMap = std::unordered_multimap<uint32_t,
std::shared_ptr<VDataStreamRecvr>>;
- StreamMap _receiver_map;
+ StreamMap _receiver_map GUARDED_BY(_lock);
struct ComparisonOp {
bool operator()(const std::pair<doris::TUniqueId, PlanNodeId>& a,
@@ -89,7 +89,11 @@ private:
}
};
using FragmentStreamSet = std::set<std::pair<TUniqueId, PlanNodeId>,
ComparisonOp>;
- FragmentStreamSet _fragment_stream_set;
+ FragmentStreamSet _fragment_stream_set GUARDED_BY(_lock);
+
+ Status _find_recvr(uint32_t hash_value, const TUniqueId&
fragment_instance_id,
+ PlanNodeId node_id, std::shared_ptr<VDataStreamRecvr>*
res)
+ REQUIRES_SHARED(_lock);
uint32_t get_hash_value(const TUniqueId& fragment_instance_id, PlanNodeId
node_id);
};
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
index 3615299fa38..49007d5c735 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
@@ -185,7 +185,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
auto filter_id = runtime_filter_desc->filter_id;
GlobalMergeContext* cnt_val;
{
- std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+ LockGuard guard(_filter_map_mutex);
cnt_val = &_filter_map[filter_id]; // may inplace construct default
object
}
@@ -235,7 +235,7 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
auto filter_id = request->filter_id();
std::map<int, GlobalMergeContext>::iterator iter;
{
- std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+ SharedLockGuard guard(_filter_map_mutex);
iter = _filter_map.find(filter_id);
if (iter == _filter_map.end()) {
return Status::InvalidArgument("unknown filter id {}",
@@ -243,12 +243,12 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
}
}
auto& cnt_val = iter->second;
- std::unique_lock<std::mutex> l(iter->second.mtx);
+ std::unique_lock<std::mutex> l(cnt_val.mtx);
// Discard stale-stage runtime filter size requests from old recursive CTE
rounds.
// Each round increments the stage counter; only messages matching the
current stage
// should be processed. This prevents old PFC's runtime filters from
corrupting
// the merge state of the new round's filters.
- if (request->stage() != iter->second.stage) {
+ if (request->stage() != cnt_val.stage) {
return Status::OK();
}
cnt_val.source_addrs.push_back(request->source_addr());
@@ -269,7 +269,7 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
}
auto sync_request = std::make_shared<PSyncFilterSizeRequest>();
- sync_request->set_stage(iter->second.stage);
+ sync_request->set_stage(cnt_val.stage);
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
@@ -336,7 +336,7 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
auto filter_id = request->filter_id();
std::map<int, GlobalMergeContext>::iterator iter;
{
- std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+ SharedLockGuard guard(_filter_map_mutex);
iter = _filter_map.find(filter_id);
VLOG_ROW << "recv filter id:" << request->filter_id() << " " <<
request->ShortDebugString();
if (iter == _filter_map.end()) {
@@ -347,9 +347,9 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
auto& cnt_val = iter->second;
bool is_ready = false;
{
- std::lock_guard<std::mutex> l(iter->second.mtx);
+ std::lock_guard<std::mutex> l(cnt_val.mtx);
// Discard stale-stage merge requests from old recursive CTE rounds.
- if (request->stage() != iter->second.stage) {
+ if (request->stage() != cnt_val.stage) {
return Status::OK();
}
if (cnt_val.merger == nullptr) {
@@ -492,7 +492,7 @@ Status RuntimeFilterMergeControllerEntity::reset_global_rf(
for (const auto& filter_id : filter_ids) {
GlobalMergeContext* cnt_val;
{
- std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+ LockGuard guard(_filter_map_mutex);
cnt_val = &_filter_map[filter_id]; // may inplace construct
default object
}
RETURN_IF_ERROR(cnt_val->reset(query_ctx));
@@ -502,7 +502,7 @@ Status RuntimeFilterMergeControllerEntity::reset_global_rf(
std::string RuntimeFilterMergeControllerEntity::debug_string() {
std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
- std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+ SharedLockGuard guard(_filter_map_mutex);
for (const auto& [filter_id, ctx] : _filter_map) {
result += fmt::format("filter_id: {}, stage: {}, {}\n", filter_id,
ctx.stage,
ctx.merger->debug_string());
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h
b/be/src/exec/runtime_filter/runtime_filter_mgr.h
index f822e01196f..418f9aa41b7 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h
@@ -27,12 +27,11 @@
#include <map>
#include <memory>
#include <mutex>
-#include <shared_mutex>
#include <unordered_set>
-#include <utility>
#include <vector>
#include "common/status.h"
+#include "common/thread_safety_annotations.h"
#include "util/uid_util.h"
namespace butil {
@@ -168,7 +167,7 @@ public:
std::string debug_string();
bool empty() {
- std::shared_lock<std::shared_mutex> read_lock(_filter_map_mutex);
+ SharedLockGuard read_lock(_filter_map_mutex);
return _filter_map.empty();
}
@@ -185,10 +184,10 @@ private:
int64_t merge_time, PUniqueId query_id, int
execution_timeout);
// protect _filter_map
- std::shared_mutex _filter_map_mutex;
+ AnnotatedSharedMutex _filter_map_mutex;
std::shared_ptr<MemTracker> _mem_tracker;
- std::map<int, GlobalMergeContext> _filter_map;
+ std::map<int, GlobalMergeContext> _filter_map
GUARDED_BY(_filter_map_mutex);
};
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/exec/sink/writer/vmysql_result_writer.cpp
b/be/src/exec/sink/writer/vmysql_result_writer.cpp
index 4101f18db3c..7f98f626f3f 100644
--- a/be/src/exec/sink/writer/vmysql_result_writer.cpp
+++ b/be/src/exec/sink/writer/vmysql_result_writer.cpp
@@ -297,6 +297,12 @@ Status VMysqlResultWriter::write(RuntimeState* state,
Block& input_block) {
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
+
+ if (_is_dry_run) {
+ _written_rows += cast_set<int64_t>(block.rows());
+ return Status::OK();
+ }
+
const auto total_bytes = block.bytes();
if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
diff --git a/be/src/exprs/aggregate/aggregate_function.h
b/be/src/exprs/aggregate/aggregate_function.h
index 0e07f74c1ae..94eaaa9ad72 100644
--- a/be/src/exprs/aggregate/aggregate_function.h
+++ b/be/src/exprs/aggregate/aggregate_function.h
@@ -479,19 +479,21 @@ public:
size_t num_rows) const override {
const Derived* derived = assert_cast<const Derived*>(this);
const auto size_of_data = derived->size_of_data();
- for (size_t i = 0; i != num_rows; ++i) {
- try {
+ size_t created_count = 0;
+ try {
+ for (size_t i = 0; i != num_rows; ++i) {
auto place = places + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
derived->create(place);
+ ++created_count;
derived->deserialize(place, buffer_reader, arena);
- } catch (...) {
- for (int j = 0; j < i; ++j) {
- auto place = places + size_of_data * j;
- derived->destroy(place);
- }
- throw;
}
+ } catch (...) {
+ for (size_t j = 0; j < created_count; ++j) {
+ auto place = places + size_of_data * j;
+ derived->destroy(place);
+ }
+ throw;
}
}
@@ -502,19 +504,21 @@ public:
const auto size_of_data = derived->size_of_data();
const auto* column_string = assert_cast<const ColumnString*>(column);
- for (size_t i = 0; i != num_rows; ++i) {
- try {
+ size_t created_count = 0;
+ try {
+ for (size_t i = 0; i != num_rows; ++i) {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader
buffer_reader(column_string->get_data_at(i));
derived->create(rhs_place);
+ ++created_count;
derived->deserialize_and_merge(places[i] + offset, rhs_place,
buffer_reader, arena);
- } catch (...) {
- for (int j = 0; j < i; ++j) {
- auto place = rhs + size_of_data * j;
- derived->destroy(place);
- }
- throw;
}
+ } catch (...) {
+ for (size_t j = 0; j < created_count; ++j) {
+ auto place = rhs + size_of_data * j;
+ derived->destroy(place);
+ }
+ throw;
}
derived->destroy_vec(rhs, num_rows);
@@ -526,22 +530,24 @@ public:
const auto* derived = assert_cast<const Derived*>(this);
const auto size_of_data = derived->size_of_data();
const auto* column_string = assert_cast<const ColumnString*>(column);
- for (size_t i = 0; i != num_rows; ++i) {
- try {
+ size_t created_count = 0;
+ try {
+ for (size_t i = 0; i != num_rows; ++i) {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader
buffer_reader(column_string->get_data_at(i));
derived->create(rhs_place);
+ ++created_count;
if (places[i]) {
derived->deserialize_and_merge(places[i] + offset,
rhs_place, buffer_reader,
arena);
}
- } catch (...) {
- for (int j = 0; j < i; ++j) {
- auto place = rhs + size_of_data * j;
- derived->destroy(place);
- }
- throw;
}
+ } catch (...) {
+ for (size_t j = 0; j < created_count; ++j) {
+ auto place = rhs + size_of_data * j;
+ derived->destroy(place);
+ }
+ throw;
}
derived->destroy_vec(rhs, num_rows);
}
diff --git a/be/src/exprs/aggregate/aggregate_function_collect.h
b/be/src/exprs/aggregate/aggregate_function_collect.h
index 3f9c84f7dea..63a3c634822 100644
--- a/be/src/exprs/aggregate/aggregate_function_collect.h
+++ b/be/src/exprs/aggregate/aggregate_function_collect.h
@@ -17,6 +17,7 @@
#pragma once
+#include <cctz/time_zone.h>
#include <glog/logging.h>
#include <cstddef>
@@ -50,7 +51,7 @@ struct AggregateFunctionCollectSetData {
using ElementType = typename PrimitiveTypeTraits<T>::CppType;
using ColVecType = typename PrimitiveTypeTraits<T>::ColumnType;
using SelfType = AggregateFunctionCollectSetData;
- using Set = phmap::flat_hash_set<ElementType>;
+ using Set = doris::flat_hash_set<ElementType>;
Set data_set;
Int64 max_size = -1;
@@ -119,7 +120,7 @@ struct AggregateFunctionCollectSetData<T, HasLimit> {
using ElementType = StringRef;
using ColVecType = ColumnString;
using SelfType = AggregateFunctionCollectSetData<T, HasLimit>;
- using Set = phmap::flat_hash_set<ElementType>;
+ using Set = doris::flat_hash_set<ElementType>;
Set data_set;
Int64 max_size = -1;
@@ -343,6 +344,10 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
buf.write_binary(size);
DataTypeSerDe::FormatOptions opt;
+ auto timezone = cctz::utc_time_zone();
+ opt.timezone = &timezone;
+ // TODO: Refactor this aggregate state serialization to avoid
+ // round-tripping through a human-readable string format.
auto tmp_str = ColumnString::create();
VectorBufferWriter tmp_buf(*tmp_str.get());
@@ -368,6 +373,8 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
StringRef s;
DataTypeSerDe::FormatOptions opt;
+ auto timezone = cctz::utc_time_zone();
+ opt.timezone = &timezone;
for (size_t i = 0; i < size; i++) {
buf.read_binary(s);
Slice slice(s.data, s.size);
diff --git a/be/src/exprs/aggregate/aggregate_function_distinct.h
b/be/src/exprs/aggregate/aggregate_function_distinct.h
index 618d9b46f41..825e782f0cb 100644
--- a/be/src/exprs/aggregate/aggregate_function_distinct.h
+++ b/be/src/exprs/aggregate/aggregate_function_distinct.h
@@ -52,8 +52,8 @@ template <PrimitiveType T, bool stable>
struct AggregateFunctionDistinctSingleNumericData {
/// When creating, the hash table must be small.
using Container = std::conditional_t<
- stable, phmap::flat_hash_map<typename
PrimitiveTypeTraits<T>::CppType, uint32_t>,
- phmap::flat_hash_set<typename PrimitiveTypeTraits<T>::CppType>>;
+ stable, doris::flat_hash_map<typename
PrimitiveTypeTraits<T>::CppType, uint32_t>,
+ doris::flat_hash_set<typename PrimitiveTypeTraits<T>::CppType>>;
using Self = AggregateFunctionDistinctSingleNumericData<T, stable>;
Container data;
@@ -126,8 +126,8 @@ struct AggregateFunctionDistinctSingleNumericData {
template <bool stable>
struct AggregateFunctionDistinctGenericData {
/// When creating, the hash table must be small.
- using Container = std::conditional_t<stable,
phmap::flat_hash_map<StringRef, uint32_t>,
- phmap::flat_hash_set<StringRef,
StringRefHash>>;
+ using Container = std::conditional_t<stable,
doris::flat_hash_map<StringRef, uint32_t>,
+ doris::flat_hash_set<StringRef,
StringRefHash>>;
using Self = AggregateFunctionDistinctGenericData;
Container data;
diff --git a/be/src/exprs/aggregate/aggregate_function_map.h
b/be/src/exprs/aggregate/aggregate_function_map.h
index f9aff592503..a16bea7867c 100644
--- a/be/src/exprs/aggregate/aggregate_function_map.h
+++ b/be/src/exprs/aggregate/aggregate_function_map.h
@@ -35,7 +35,7 @@ namespace doris {
template <PrimitiveType K>
struct AggregateFunctionMapAggData {
using KeyType = typename PrimitiveTypeTraits<K>::CppType;
- using Map = phmap::flat_hash_map<StringRef, int64_t>;
+ using Map = doris::flat_hash_map<StringRef, int64_t>;
AggregateFunctionMapAggData() { throw
Exception(Status::FatalError("__builtin_unreachable")); }
diff --git a/be/src/exprs/aggregate/aggregate_function_map_v2.h
b/be/src/exprs/aggregate/aggregate_function_map_v2.h
index 3181b1ad426..1d821c486c6 100644
--- a/be/src/exprs/aggregate/aggregate_function_map_v2.h
+++ b/be/src/exprs/aggregate/aggregate_function_map_v2.h
@@ -33,7 +33,7 @@ namespace doris {
#include "common/compile_check_begin.h"
struct AggregateFunctionMapAggDataV2 {
- using Map = phmap::flat_hash_map<Field, int64_t>;
+ using Map = doris::flat_hash_map<Field, int64_t>;
AggregateFunctionMapAggDataV2() {
throw Exception(Status::FatalError("__builtin_unreachable"));
diff --git a/be/src/exprs/function/function_map.cpp
b/be/src/exprs/function/function_map.cpp
index ffe3e773b9f..54d01931b10 100644
--- a/be/src/exprs/function/function_map.cpp
+++ b/be/src/exprs/function/function_map.cpp
@@ -27,6 +27,7 @@
#include <tuple>
#include <utility>
+#include "common/exception.h"
#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/block.h"
@@ -561,124 +562,19 @@ private:
const ColumnArray::Offsets64& map_offsets,
const UInt8* map_row_nullmap, bool
search_is_const,
ColumnUInt8& result_matches) const {
- switch (type) {
- case TYPE_BOOLEAN:
- _execute_column_comparison<ColumnUInt8>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_TINYINT:
- _execute_column_comparison<ColumnInt8>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_SMALLINT:
- _execute_column_comparison<ColumnInt16>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_INT:
- _execute_column_comparison<ColumnInt32>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_BIGINT:
- _execute_column_comparison<ColumnInt64>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_LARGEINT:
- _execute_column_comparison<ColumnInt128>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_FLOAT:
- _execute_column_comparison<ColumnFloat32>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DOUBLE:
- _execute_column_comparison<ColumnFloat64>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DECIMAL32:
- _execute_column_comparison<ColumnDecimal32>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DECIMAL64:
- _execute_column_comparison<ColumnDecimal64>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DECIMAL128I:
- _execute_column_comparison<ColumnDecimal128V3>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DECIMALV2:
- _execute_column_comparison<ColumnDecimal128V2>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DECIMAL256:
- _execute_column_comparison<ColumnDecimal256>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_STRING:
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- _execute_column_comparison<ColumnString>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DATE:
- _execute_column_comparison<ColumnDate>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DATETIME:
- _execute_column_comparison<ColumnDateTime>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DATEV2:
- _execute_column_comparison<ColumnDateV2>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_DATETIMEV2:
- _execute_column_comparison<ColumnDateTimeV2>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_TIME:
- _execute_column_comparison<ColumnTime>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_TIMEV2:
- _execute_column_comparison<ColumnTimeV2>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_IPV4:
- _execute_column_comparison<ColumnIPv4>(
- map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
- map_row_nullmap, search_is_const, result_matches);
- break;
- case TYPE_IPV6:
- _execute_column_comparison<ColumnIPv6>(
+ auto call = [&](const auto& type) -> bool {
+ using DispatchType = std::decay_t<decltype(type)>;
+
+ _execute_column_comparison<typename DispatchType::ColumnType>(
map_entry_column, map_entry_nullmap, search_column,
search_nullmap, map_offsets,
map_row_nullmap, search_is_const, result_matches);
- break;
- default:
- // We have done type check before dispatching, so this should not
happen
- DCHECK(false) << "Dispatching unsupported primitive type in " <<
get_name() << ": "
- << static_cast<int>(type);
- break;
+
+ return true;
+ };
+
+ if (!dispatch_switch_all(type, call)) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "not support
type {}",
+ type_to_string(type));
}
}
@@ -867,10 +763,11 @@ private:
/*nan_direction_hint=*/1) == 0;
}
- // whether this function supports equality comparison for the given
primitive type
+ // whether this function supports equality comparison for the given
primitive type.
+ // Uses dispatch_switch_all as the single source of truth so any type
supported
+ // by the dispatch layer is automatically accepted here.
bool is_equality_comparison_supported(PrimitiveType type) const {
- return is_string_type(type) || is_number(type) || is_date_type(type) ||
- is_time_type(type) || is_ip(type);
+ return dispatch_switch_all(type, [](const auto&) { return true; });
}
};
diff --git a/be/src/exprs/function/function_quantile_state.cpp
b/be/src/exprs/function/function_quantile_state.cpp
index a0edb82dbf9..af1b80822f0 100644
--- a/be/src/exprs/function/function_quantile_state.cpp
+++ b/be/src/exprs/function/function_quantile_state.cpp
@@ -169,6 +169,8 @@ public:
bool use_default_implementation_for_nulls() const override { return false;
}
+ ColumnNumbers get_arguments_that_are_always_constant() const override {
return {1}; }
+
Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const
override {
auto res_data_column = ColumnFloat64::create();
diff --git a/be/src/exprs/function/function_regexp.cpp
b/be/src/exprs/function/function_regexp.cpp
index 8a9871c8eb3..fd642e8abf4 100644
--- a/be/src/exprs/function/function_regexp.cpp
+++ b/be/src/exprs/function/function_regexp.cpp
@@ -34,6 +34,7 @@
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_const.h"
+#include "core/column/column_execute_util.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_vector.h"
@@ -189,23 +190,26 @@ struct RegexpExtractEngine {
};
struct RegexpCountImpl {
+ using StringColumnView = ColumnView<TYPE_STRING>;
+
static void execute_impl(FunctionContext* context, ColumnPtr
argument_columns[],
size_t input_rows_count, ColumnInt32::Container&
result_data) {
- const auto* str_col =
check_and_get_column<ColumnString>(argument_columns[0].get());
- const auto* pattern_col =
check_and_get_column<ColumnString>(argument_columns[1].get());
- for (int i = 0; i < input_rows_count; ++i) {
+ auto str_col = StringColumnView::create(argument_columns[0]);
+ auto pattern_col = StringColumnView::create(argument_columns[1]);
+ for (size_t i = 0; i < input_rows_count; ++i) {
+ DCHECK(!str_col.is_null_at(i));
+ DCHECK(!pattern_col.is_null_at(i));
result_data[i] = _execute_inner_loop(context, str_col,
pattern_col, i);
}
}
- static int _execute_inner_loop(FunctionContext* context, const
ColumnString* str_col,
- const ColumnString* pattern_col, const
size_t index_now) {
+ static int _execute_inner_loop(FunctionContext* context, const
StringColumnView& str_col,
+ const StringColumnView& pattern_col, const
size_t index_now) {
re2::RE2* re = reinterpret_cast<re2::RE2*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
std::unique_ptr<re2::RE2> scoped_re;
if (re == nullptr) {
std::string error_str;
- DCHECK(pattern_col);
- const auto& pattern =
pattern_col->get_data_at(index_check_const(index_now, false));
+ const auto pattern = pattern_col.value_at(index_now);
bool st = StringFunctions::compile_regex(pattern, &error_str,
StringRef(), StringRef(),
scoped_re);
if (!st) {
@@ -216,7 +220,7 @@ struct RegexpCountImpl {
re = scoped_re.get();
}
- const auto& str = str_col->get_data_at(index_now);
+ const auto str = str_col.value_at(index_now);
int count = 0;
size_t pos = 0;
while (pos < str.size) {
diff --git a/be/src/exprs/function/uniform.cpp
b/be/src/exprs/function/uniform.cpp
index 3bd1e139e15..e639df7a295 100644
--- a/be/src/exprs/function/uniform.cpp
+++ b/be/src/exprs/function/uniform.cpp
@@ -30,6 +30,7 @@
#include "core/block/block.h"
#include "core/block/column_numbers.h"
#include "core/column/column.h"
+#include "core/column/column_execute_util.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
#include "core/data_type/primitive_type.h"
@@ -74,12 +75,12 @@ struct UniformIntImpl {
"uniform's min should be less than max, but got [{}, {})",
min, max);
}
- // Get gen column (seed values)
- const auto& gen_column = block.get_by_position(arguments[2]).column;
+ auto gen_column =
+
ColumnView<TYPE_BIGINT>::create(block.get_by_position(arguments[2]).column);
for (int i = 0; i < input_rows_count; i++) {
// Use gen value as seed for each row
- auto seed = (*gen_column)[i].get<TYPE_BIGINT>();
+ auto seed = gen_column.value_at(i);
std::mt19937_64 generator(seed);
std::uniform_int_distribution<int64_t> distribution(min, max);
res_data[i] = distribution(generator);
@@ -123,11 +124,12 @@ struct UniformDoubleImpl {
}
// Get gen column (seed values)
- const auto& gen_column = block.get_by_position(arguments[2]).column;
+ auto gen_column =
+
ColumnView<TYPE_BIGINT>::create(block.get_by_position(arguments[2]).column);
for (int i = 0; i < input_rows_count; i++) {
// Use gen value as seed for each row
- auto seed = (*gen_column)[i].get<TYPE_BIGINT>();
+ auto seed = gen_column.value_at(i);
std::mt19937_64 generator(seed);
std::uniform_real_distribution<double> distribution(min, max);
res_data[i] = distribution(generator);
@@ -146,6 +148,8 @@ public:
static FunctionPtr create() { return
std::make_shared<FunctionUniform<Impl>>(); }
String get_name() const override { return name; }
+ bool use_default_implementation_for_constants() const override { return
false; }
+
size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}
@@ -158,6 +162,8 @@ public:
return Impl::get_variadic_argument_types();
}
+ ColumnNumbers get_arguments_that_are_always_constant() const override {
return {0, 1}; }
+
Status open(FunctionContext* context, FunctionContext::FunctionStateScope
scope) override {
// init_function_context do set_constant_cols for FRAGMENT_LOCAL scope
if (scope == FunctionContext::FRAGMENT_LOCAL) {
diff --git a/be/test/core/column/column_const_test.cpp
b/be/test/core/column/column_const_test.cpp
index f6f81ec3aab..e9f57df213b 100644
--- a/be/test/core/column/column_const_test.cpp
+++ b/be/test/core/column/column_const_test.cpp
@@ -41,6 +41,19 @@ TEST(ColumnConstTest, TestCreate) {
EXPECT_TRUE(!is_column_const(column_const2->get_data_column()));
}
+TEST(ColumnConstTest, clone_resized_clones_nested_data) {
+ auto column_data = ColumnHelper::create_column<DataTypeInt64>({7});
+ auto column_const = ColumnConst::create(column_data, 3);
+
+ auto cloned = column_const->clone_resized(5);
+ const auto& cloned_const = assert_cast<const ColumnConst&>(*cloned);
+
+ EXPECT_EQ(cloned_const.size(), 5);
+ EXPECT_EQ(cloned_const.get_data_column_ptr()->size(), 1);
+ EXPECT_EQ(cloned_const.get_data_column().get_int(0), 7);
+ EXPECT_NE(column_const->get_data_column_ptr().get(),
cloned_const.get_data_column_ptr().get());
+}
+
TEST(ColumnConstTest, TestFilter) {
{
auto column_data = ColumnHelper::create_column<DataTypeInt64>({7});
diff --git a/be/test/core/data_type_serde/data_type_serde_mysql_test.cpp
b/be/test/core/data_type_serde/data_type_serde_mysql_test.cpp
index d0a6cbdbbae..e8f289bbf54 100644
--- a/be/test/core/data_type_serde/data_type_serde_mysql_test.cpp
+++ b/be/test/core/data_type_serde/data_type_serde_mysql_test.cpp
@@ -77,6 +77,10 @@ class TestBlockSerializer final : public
MySQLResultBlockBuffer {
public:
TestBlockSerializer(RuntimeState* state) : MySQLResultBlockBuffer(state) {}
~TestBlockSerializer() override = default;
+ size_t queue_size() {
+ std::lock_guard<std::mutex> l(_lock);
+ return _result_batch_queue.size();
+ }
std::shared_ptr<TFetchDataResult> get_block() {
std::lock_guard<std::mutex> l(_lock);
DCHECK_EQ(_result_batch_queue.size(), 1);
@@ -86,7 +90,7 @@ public:
}
};
-void serialize_and_deserialize_mysql_test() {
+void serialize_and_deserialize_mysql_test(bool dry_run) {
Block block;
// create_descriptor_tablet();
std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>>
cols {
@@ -317,12 +321,25 @@ void serialize_and_deserialize_mysql_test() {
auto serializer = std::make_shared<TestBlockSerializer>(&state);
VMysqlResultWriter mysql_writer(serializer, _output_vexpr_ctxs, nullptr,
false);
- Status st = mysql_writer.write(&runtime_stat, block);
+ TQueryOptions query_options;
+ query_options.__set_dry_run_query(dry_run);
+ runtime_stat.set_query_options(query_options);
+
+ Status st = mysql_writer.init(&runtime_stat);
EXPECT_TRUE(st.ok());
+
+ st = mysql_writer.write(&runtime_stat, block);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(mysql_writer.get_written_rows(), row_num);
+ EXPECT_EQ(serializer->queue_size(), dry_run ? 0 : 1);
}
TEST(DataTypeSerDeMysqlTest, ScalaSerDeTest) {
- serialize_and_deserialize_mysql_test();
+ serialize_and_deserialize_mysql_test(false);
+}
+
+TEST(DataTypeSerDeMysqlTest, DryRunSkipsSerialization) {
+ serialize_and_deserialize_mysql_test(true);
}
} // namespace doris
diff --git a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
index ab6b03b13c5..f0c4e05c7e6 100644
--- a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
+++ b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
@@ -577,7 +577,7 @@ TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
struct MockVDataStreamMgr : public VDataStreamMgr {
~MockVDataStreamMgr() override = default;
Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId
node_id,
- std::shared_ptr<VDataStreamRecvr>* res, bool
acquire_lock = true) override {
+ std::shared_ptr<VDataStreamRecvr>* res) override {
*res = recvr;
return Status::OK();
}
diff --git a/be/test/exprs/aggregate/aggregate_function_exception_test.cpp
b/be/test/exprs/aggregate/aggregate_function_exception_test.cpp
new file mode 100644
index 00000000000..21ee64dba4a
--- /dev/null
+++ b/be/test/exprs/aggregate/aggregate_function_exception_test.cpp
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "core/arena.h"
+#include "exprs/aggregate/aggregate_function.h"
+
+namespace doris {
+
+struct TrackingAggregateState {
+ TrackingAggregateState() { ++construct_count; }
+ ~TrackingAggregateState() { ++destroy_count; }
+
+ static void reset_counters() {
+ construct_count = 0;
+ destroy_count = 0;
+ }
+
+ static int construct_count;
+ static int destroy_count;
+};
+
+int TrackingAggregateState::construct_count = 0;
+int TrackingAggregateState::destroy_count = 0;
+
+class ThrowOnDeserializeAggregateFunction final
+ : public IAggregateFunctionDataHelper<TrackingAggregateState,
+
ThrowOnDeserializeAggregateFunction> {
+public:
+ ThrowOnDeserializeAggregateFunction()
+ : IAggregateFunctionDataHelper<TrackingAggregateState,
+
ThrowOnDeserializeAggregateFunction>(
+ DataTypes {std::make_shared<DataTypeString>()}) {}
+
+ String get_name() const override { return "throw_on_deserialize"; }
+
+ DataTypePtr get_return_type() const override { return
std::make_shared<DataTypeString>(); }
+
+ void add(AggregateDataPtr, const IColumn**, ssize_t, Arena&) const
override {}
+
+ void merge(AggregateDataPtr, ConstAggregateDataPtr, Arena&) const override
{}
+
+ void serialize(ConstAggregateDataPtr, BufferWritable& buf) const override {
+ String payload;
+ buf.write_binary(payload);
+ }
+
+ void deserialize(AggregateDataPtr, BufferReadable& buf, Arena&) const
override {
+ String payload;
+ buf.read_binary(payload);
+ if (payload == "throw") {
+ throw Exception(ErrorCode::INTERNAL_ERROR, "mock deserialize
failure");
+ }
+ }
+
+ void insert_result_into(ConstAggregateDataPtr, IColumn&) const override {}
+};
+
+class AggregateFunctionExceptionTest : public testing::Test {
+protected:
+ void SetUp() override { TrackingAggregateState::reset_counters(); }
+
+ MutableColumnPtr make_column(std::initializer_list<String> payloads) {
+ auto column = ColumnString::create();
+ VectorBufferWriter writer(*column);
+ for (const auto& payload : payloads) {
+ writer.write_binary(payload);
+ writer.commit();
+ }
+ return column;
+ }
+
+ ThrowOnDeserializeAggregateFunction function;
+ Arena arena;
+};
+
+TEST_F(AggregateFunctionExceptionTest,
DeserializeVecDestroysCurrentStateOnFailure) {
+ auto column = make_column({"ok", "throw"});
+ std::vector<char> states(function.size_of_data() * 2);
+
+ bool thrown = false;
+ try {
+ function.deserialize_vec(states.data(),
static_cast<ColumnString*>(column.get()), arena, 2);
+ } catch (const Exception&) {
+ thrown = true;
+ }
+
+ EXPECT_TRUE(thrown);
+ if (!thrown) {
+ function.destroy_vec(states.data(), 2);
+ }
+ EXPECT_EQ(TrackingAggregateState::construct_count, 2);
+ EXPECT_EQ(TrackingAggregateState::destroy_count, 2);
+}
+
+TEST_F(AggregateFunctionExceptionTest,
DeserializeAndMergeVecDestroysRhsStateOnFailure) {
+ auto column = make_column({"throw"});
+ std::vector<char> place_storage(function.size_of_data());
+ std::vector<char> rhs_storage(function.size_of_data());
+ auto* place = place_storage.data();
+ function.create(place);
+
+ std::array<AggregateDataPtr, 1> places {place};
+ const auto destroy_count_before_call =
TrackingAggregateState::destroy_count;
+ bool thrown = false;
+ try {
+ function.deserialize_and_merge_vec(places.data(), 0,
rhs_storage.data(), column.get(),
+ arena, 1);
+ } catch (const Exception&) {
+ thrown = true;
+ }
+
+ EXPECT_TRUE(thrown);
+ EXPECT_EQ(TrackingAggregateState::destroy_count -
destroy_count_before_call, 1);
+
+ function.destroy(place);
+ EXPECT_EQ(TrackingAggregateState::construct_count,
TrackingAggregateState::destroy_count);
+}
+
+TEST_F(AggregateFunctionExceptionTest,
+ DeserializeAndMergeVecSelectedDestroysAllCreatedRhsStatesOnFailure) {
+ auto column = make_column({"skip", "throw"});
+ std::vector<char> place_storage(function.size_of_data());
+ std::vector<char> rhs_storage(function.size_of_data() * 2);
+ auto* place = place_storage.data();
+ function.create(place);
+
+ std::array<AggregateDataPtr, 2> places {nullptr, place};
+ const auto destroy_count_before_call =
TrackingAggregateState::destroy_count;
+ bool thrown = false;
+ try {
+ function.deserialize_and_merge_vec_selected(places.data(), 0,
rhs_storage.data(),
+ column.get(), arena, 2);
+ } catch (const Exception&) {
+ thrown = true;
+ }
+
+ EXPECT_TRUE(thrown);
+ EXPECT_EQ(TrackingAggregateState::destroy_count -
destroy_count_before_call, 2);
+
+ function.destroy(place);
+ EXPECT_EQ(TrackingAggregateState::construct_count,
TrackingAggregateState::destroy_count);
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/exprs/function/function_math_test.cpp
b/be/test/exprs/function/function_math_test.cpp
index 4e51a5dc3e7..cf1b3a442ea 100644
--- a/be/test/exprs/function/function_math_test.cpp
+++ b/be/test/exprs/function/function_math_test.cpp
@@ -18,14 +18,17 @@
#include <climits>
#include <cstdint>
#include <limits>
+#include <random>
#include <string>
+#include "core/column/column_const.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/types.h"
#include "exprs/function/function_test_util.h"
#include "testutil/any_type.h"
+#include "testutil/column_helper.h"
namespace doris {
@@ -532,6 +535,11 @@ TEST(MathFunctionTest, hex_test) {
}
TEST(MathFunctionTest, random_test) {
+#ifndef NDEBUG
+ GTEST_SKIP() << "random(seed) exact-value assertions are release-only;
debug builds run "
+ "mock_const_execute before the real call.";
+#endif
+
std::string func_name = "random"; // random(x)
InputTypeSet input_types = {Consted {PrimitiveType::TYPE_BIGINT}};
DataSet data_set = {{{Null()}, Null()},
@@ -547,6 +555,56 @@ TEST(MathFunctionTest, random_test) {
}
}
+TEST(MathFunctionTest, uniform_mixed_const_probe_test) {
+ auto input_type = std::make_shared<DataTypeInt64>();
+ auto return_type = std::make_shared<DataTypeInt64>();
+
+ Block block;
+ auto min_data = ColumnHelper::create_column<DataTypeInt64>({1});
+ auto max_data = ColumnHelper::create_column<DataTypeInt64>({10});
+ auto seed_column = ColumnHelper::create_column<DataTypeInt64>({101, 202,
303});
+
+ block.insert({ColumnConst::create(min_data, 3), input_type, "min"});
+ block.insert({ColumnConst::create(max_data, 3), input_type, "max"});
+ block.insert({seed_column, input_type, "seed"});
+
+ FunctionBasePtr function = SimpleFunctionFactory::instance().get_function(
+ "uniform", block.get_columns_with_type_and_name(), return_type);
+ ASSERT_TRUE(function != nullptr);
+
+ block.insert({nullptr, return_type, "result"});
+
+ FunctionUtils fn_utils(return_type, {input_type, input_type, input_type},
false);
+ auto* fn_ctx = fn_utils.get_fn_ctx();
+ std::vector<std::shared_ptr<ColumnPtrWrapper>> constant_cols {
+
std::make_shared<ColumnPtrWrapper>(block.get_by_position(0).column),
+
std::make_shared<ColumnPtrWrapper>(block.get_by_position(1).column),
+ nullptr,
+ };
+ fn_ctx->set_constant_cols(constant_cols);
+
+ ASSERT_TRUE(function->open(fn_ctx, FunctionContext::FRAGMENT_LOCAL).ok());
+ ASSERT_TRUE(function->open(fn_ctx, FunctionContext::THREAD_LOCAL).ok());
+
+ auto exec_status = function->execute(fn_ctx, block, {0, 1, 2}, 3, 3);
+
+ static_cast<void>(function->close(fn_ctx, FunctionContext::THREAD_LOCAL));
+ static_cast<void>(function->close(fn_ctx,
FunctionContext::FRAGMENT_LOCAL));
+
+ ASSERT_TRUE(exec_status.ok()) << exec_status.to_string();
+
+ const auto& result_column = assert_cast<const
ColumnInt64&>(*block.get_by_position(3).column);
+ auto expected_uniform = [](int64_t seed) {
+ std::mt19937_64 generator(seed);
+ std::uniform_int_distribution<int64_t> distribution(1, 10);
+ return distribution(generator);
+ };
+
+ EXPECT_EQ(result_column.get_element(0), expected_uniform(101));
+ EXPECT_EQ(result_column.get_element(1), expected_uniform(202));
+ EXPECT_EQ(result_column.get_element(2), expected_uniform(303));
+}
+
TEST(MathFunctionTest, conv_test) {
std::string func_name = "conv";
diff --git a/be/test/exprs/function/function_quantile_state_test.cpp
b/be/test/exprs/function/function_quantile_state_test.cpp
index 1cb1ced1dae..e8f2fca7028 100644
--- a/be/test/exprs/function/function_quantile_state_test.cpp
+++ b/be/test/exprs/function/function_quantile_state_test.cpp
@@ -213,4 +213,21 @@ TEST(function_quantile_state_test,
function_quantile_state_roundtrip) {
0.01);
}
+TEST(function_quantile_state_test, function_quantile_percent_mixed_const_test)
{
+ std::string func_name = "quantile_percent";
+ InputTypeSet input_types = {PrimitiveType::TYPE_QUANTILE_STATE,
+ ConstedNotnull {PrimitiveType::TYPE_FLOAT}};
+
+ QuantileState quantile_state;
+ quantile_state.add_value(1.0);
+ quantile_state.add_value(2.0);
+ quantile_state.add_value(3.0);
+ quantile_state.add_value(4.0);
+ quantile_state.add_value(5.0);
+
+ DataSet data_set = {{{&quantile_state, 0.5F}, 3.0}};
+
+ static_cast<void>(check_function<DataTypeFloat64, false>(func_name,
input_types, data_set));
+}
+
} // namespace doris
diff --git a/be/test/exprs/function/function_string_test.cpp
b/be/test/exprs/function/function_string_test.cpp
index edf888f2c8f..90456da258a 100644
--- a/be/test/exprs/function/function_string_test.cpp
+++ b/be/test/exprs/function/function_string_test.cpp
@@ -3854,4 +3854,20 @@ TEST(function_string_test,
function_unicode_normalize_invalid_mode) {
EXPECT_NE(Status::OK(), st);
}
+TEST(function_string_test, function_regexp_count_mixed_const_test) {
+ std::string func_name = "regexp_count";
+
+ InputTypeSet input_types = {PrimitiveType::TYPE_VARCHAR,
PrimitiveType::TYPE_VARCHAR};
+ DataSet data_set = {
+ {{std::string("a.b:c;d"), std::string("[.:;]")}, std::int32_t(3)},
+ {{std::string("a1b2346c3d"), std::string("\\d+")},
std::int32_t(3)},
+ {{std::string("abcd"), std::string("")}, std::int32_t(0)},
+ {{std::string("book keeper"), std::string("oo|ee")},
std::int32_t(2)},
+ {{Null(), std::string("\\d+")}, Null()},
+ {{std::string("abcd"), Null()}, Null()},
+ };
+
+ check_function_all_arg_comb<DataTypeInt32, true>(func_name, input_types,
data_set);
+}
+
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinder.java
index c93f151cf0c..4e1a768bd97 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinder.java
@@ -55,6 +55,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.scalar.HourSecondSub
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursAdd;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursDiff;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursSub;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.MicroSecondsDiff;
import org.apache.doris.nereids.trees.expressions.functions.scalar.MinuteCeil;
import org.apache.doris.nereids.trees.expressions.functions.scalar.MinuteFloor;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.MinuteMicrosecondAdd;
@@ -301,9 +302,11 @@ public class DatetimeFunctionBinder {
return new MinutesDiff(end, start);
case SECOND:
return new SecondsDiff(end, start);
+ case MICROSECOND:
+ return new MicroSecondsDiff(end, start);
default:
throw new AnalysisException("Unsupported time stamp diff time
unit: " + unit
- + ", supported time unit:
YEAR/QUARTER/MONTH/WEEK/DAY/HOUR/MINUTE/SECOND");
+ + ", supported time unit:
YEAR/QUARTER/MONTH/WEEK/DAY/HOUR/MINUTE/SECOND/MICROSECOND");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Interval.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Interval.java
index 275e0f74fe1..f490c225c44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Interval.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Interval.java
@@ -106,6 +106,7 @@ public class Interval extends Expression implements
UnaryExpression, AlwaysNotNu
MINUTE_SECOND("MINUTE_SECOND", false, 200),
MINUTE_MICROSECOND("MINUTE_MICROSECOND", false, 200),
SECOND("SECOND", true, 100),
+ MICROSECOND("MICROSECOND", true, 0),
SECOND_MICROSECOND("SECOND_MICROSECOND", true, 100);
private final String description;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinderTest.java
index 81f24ed878b..a63e4a3e628 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinderTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/DatetimeFunctionBinderTest.java
@@ -42,6 +42,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.scalar.HourFloor;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursAdd;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursDiff;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursSub;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.MicroSecondsDiff;
import org.apache.doris.nereids.trees.expressions.functions.scalar.MinuteCeil;
import org.apache.doris.nereids.trees.expressions.functions.scalar.MinuteFloor;
import org.apache.doris.nereids.trees.expressions.functions.scalar.MinutesAdd;
@@ -110,6 +111,8 @@ public class DatetimeFunctionBinderTest {
TinyIntType.INSTANCE, false, ImmutableList.of());
private final SlotReference secondUnit = new SlotReference(new ExprId(-1),
"SECOND",
TinyIntType.INSTANCE, false, ImmutableList.of());
+ private final SlotReference microsecondUnit = new SlotReference(new
ExprId(-1), "MICROSECOND",
+ TinyIntType.INSTANCE, false, ImmutableList.of());
private final SlotReference invalidUnit = new SlotReference(new
ExprId(-1), "INVALID",
TinyIntType.INSTANCE, false, ImmutableList.of());
@@ -172,6 +175,13 @@ public class DatetimeFunctionBinderTest {
Assertions.assertEquals(dateTimeV2Literal2, result.child(0));
Assertions.assertEquals(dateTimeV2Literal1, result.child(1));
+ timeDiff = new UnboundFunction(functionName, ImmutableList.of(
+ microsecondUnit, dateTimeV2Literal1, dateTimeV2Literal2));
+ result = DatetimeFunctionBinder.INSTANCE.bind(timeDiff);
+ Assertions.assertInstanceOf(MicroSecondsDiff.class, result);
+ Assertions.assertEquals(dateTimeV2Literal2, result.child(0));
+ Assertions.assertEquals(dateTimeV2Literal1, result.child(1));
+
Assertions.assertThrowsExactly(AnalysisException.class,
() -> DatetimeFunctionBinder.INSTANCE.bind(
new UnboundFunction(functionName,
ImmutableList.of(invalidUnit,
diff --git
a/regression-test/data/datatype_p0/timestamptz/test_timestamptz_agg_functions.out
b/regression-test/data/datatype_p0/timestamptz/test_timestamptz_agg_functions.out
index 850cbe14a98..f7ff2eb36d0 100644
---
a/regression-test/data/datatype_p0/timestamptz/test_timestamptz_agg_functions.out
+++
b/regression-test/data/datatype_p0/timestamptz/test_timestamptz_agg_functions.out
@@ -11,3 +11,6 @@ true
-- !group_array_union --
3
+-- !group_array_nested_timestamptz --
+[["2024-01-01 00:00:00.000000+00:00", "2024-01-01 00:00:00.000000+00:00",
"2024-01-02 00:00:00.000000+00:00"], ["2024-01-01 00:00:00.000000+00:00",
"2024-01-02 00:00:00.000000+00:00", "2024-01-03 00:00:00.000000+00:00"]]
+
diff --git
a/regression-test/data/datatype_p0/timestamptz/test_timestamptz_map_contains_entry.out
b/regression-test/data/datatype_p0/timestamptz/test_timestamptz_map_contains_entry.out
new file mode 100644
index 00000000000..43746eee180
--- /dev/null
+++
b/regression-test/data/datatype_p0/timestamptz/test_timestamptz_map_contains_entry.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !value_hit --
+true
+
+-- !value_miss --
+false
+
+-- !value_miss_key --
+false
+
+-- !key_hit --
+true
+
+-- !key_miss_value --
+false
+
+-- !key_miss_key --
+false
+
+-- !table_value_hit --
+1 true
+2 false
+
+-- !table_value_miss --
+1 false
+2 false
+
+-- !table_key_hit --
+1 true
+2 false
+
+-- !table_key_miss --
+1 false
+2 false
+
+-- !null_search_key --
+1 false
+2 false
+
+-- !null_search_value --
+1 false
+2 false
+
diff --git a/regression-test/data/nereids_syntax_p0/test_timestampdiff.out
b/regression-test/data/nereids_syntax_p0/test_timestampdiff.out
index 0e2dd6a5375..15623515ed4 100644
--- a/regression-test/data/nereids_syntax_p0/test_timestampdiff.out
+++ b/regression-test/data/nereids_syntax_p0/test_timestampdiff.out
@@ -17,3 +17,9 @@
-- !select --
40
+-- !select --
+876543
+
+-- !select --
+2024-01-01T10:00:00.999999 2024-01-01T10:00:00.123456 876543
+
diff --git
a/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_agg_functions.groovy
b/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_agg_functions.groovy
index 89126b5a284..e5bf945225e 100644
---
a/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_agg_functions.groovy
+++
b/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_agg_functions.groovy
@@ -56,4 +56,41 @@ suite("test_timestamptz_agg_functions", "datatype_p0") {
qt_group_array_union "SELECT size(group_array_union(arr)) FROM test_tz_agg"
sql "DROP TABLE IF EXISTS test_tz_agg"
+
+ sql "DROP TABLE IF EXISTS tz_group_array_crash"
+ sql """
+ CREATE TABLE tz_group_array_crash (
+ grp INT,
+ arr ARRAY<TIMESTAMPTZ(6)>
+ )
+ DUPLICATE KEY(grp)
+ DISTRIBUTED BY HASH(grp) BUCKETS 1
+ PROPERTIES('replication_num' = '1')
+ """
+
+ sql """
+ INSERT INTO tz_group_array_crash VALUES
+ (
+ 1,
+ ARRAY(
+ CAST('2024-01-01 00:00:00 +00:00' AS TIMESTAMPTZ(6)),
+ CAST('2024-01-01 08:00:00 +08:00' AS TIMESTAMPTZ(6)),
+ CAST('2024-01-02 00:00:00 +00:00' AS TIMESTAMPTZ(6))
+ )
+ ),
+ (
+ 1,
+ ARRAY(
+ CAST('2024-01-01 00:00:00 +00:00' AS TIMESTAMPTZ(6)),
+ CAST('2024-01-02 08:00:00 +08:00' AS TIMESTAMPTZ(6)),
+ CAST('2024-01-03 00:00:00 +00:00' AS TIMESTAMPTZ(6))
+ )
+ )
+ """
+
+ qt_group_array_nested_timestamptz """
+ SELECT CAST(array_sort(group_array(arr)) AS STRING)
+ FROM tz_group_array_crash
+ GROUP BY grp
+ """
}
diff --git
a/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_map_contains_entry.groovy
b/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_map_contains_entry.groovy
new file mode 100644
index 00000000000..2b814ef8b6e
--- /dev/null
+++
b/regression-test/suites/datatype_p0/timestamptz/test_timestamptz_map_contains_entry.groovy
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_timestamptz_map_contains_entry") {
+
+ sql "set time_zone = '+08:00';"
+ sql "set enable_nereids_planner = true;"
+ sql "set enable_fallback_to_original_planner = false;"
+
+ // --- inline literal tests (no table needed) ---
+
+ // TIMESTAMPTZ as map value: hit
+ qt_value_hit """
+ SELECT map_contains_entry(
+ map('a', cast('2024-01-01 00:00:00.000000 +00:00' as
timestamptz(6)),
+ 'b', cast('2024-01-02 03:04:05.123456 +00:00' as
timestamptz(6))),
+ 'a',
+ cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6))
+ );
+ """
+
+ // TIMESTAMPTZ as map value: miss (wrong value)
+ qt_value_miss """
+ SELECT map_contains_entry(
+ map('a', cast('2024-01-01 00:00:00.000000 +00:00' as
timestamptz(6)),
+ 'b', cast('2024-01-02 03:04:05.123456 +00:00' as
timestamptz(6))),
+ 'a',
+ cast('2024-01-02 03:04:05.123456 +00:00' as timestamptz(6))
+ );
+ """
+
+ // TIMESTAMPTZ as map value: miss (wrong key)
+ qt_value_miss_key """
+ SELECT map_contains_entry(
+ map('a', cast('2024-01-01 00:00:00.000000 +00:00' as
timestamptz(6)),
+ 'b', cast('2024-01-02 03:04:05.123456 +00:00' as
timestamptz(6))),
+ 'c',
+ cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6))
+ );
+ """
+
+ // TIMESTAMPTZ as map key: hit
+ qt_key_hit """
+ SELECT map_contains_entry(
+ map(cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6)),
'a',
+ cast('2024-01-02 03:04:05.123456 +00:00' as timestamptz(6)),
'b'),
+ cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6)),
+ 'a'
+ );
+ """
+
+ // TIMESTAMPTZ as map key: miss (wrong value)
+ qt_key_miss_value """
+ SELECT map_contains_entry(
+ map(cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6)),
'a',
+ cast('2024-01-02 03:04:05.123456 +00:00' as timestamptz(6)),
'b'),
+ cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6)),
+ 'b'
+ );
+ """
+
+ // TIMESTAMPTZ as map key: miss (wrong key)
+ qt_key_miss_key """
+ SELECT map_contains_entry(
+ map(cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6)),
'a',
+ cast('2024-01-02 03:04:05.123456 +00:00' as timestamptz(6)),
'b'),
+ cast('2024-01-03 00:00:00.000000 +00:00' as timestamptz(6)),
+ 'a'
+ );
+ """
+
+ // --- table-based tests ---
+
+ sql "DROP TABLE IF EXISTS test_timestamptz_map_contains_entry_t;"
+ sql """
+ CREATE TABLE test_timestamptz_map_contains_entry_t (
+ id INT,
+ map_s_tz MAP<STRING, TIMESTAMPTZ(6)>,
+ map_tz_s MAP<TIMESTAMPTZ(6), STRING>
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+
+ sql """
+ INSERT INTO test_timestamptz_map_contains_entry_t VALUES (
+ 1,
+ map('a', cast('2024-01-01 00:00:00.000000 +00:00' as
timestamptz(6)),
+ 'b', cast('2024-01-02 03:04:05.123456 +00:00' as
timestamptz(6))),
+ map(cast('2024-01-01 00:00:00.000000 +00:00' as timestamptz(6)),
'a',
+ cast('2024-01-02 03:04:05.123456 +00:00' as timestamptz(6)),
'b')
+ ), (
+ 2,
+ map('x', cast('2024-06-15 12:00:00.000000 +05:30' as
timestamptz(6))),
+ map(cast('2024-06-15 12:00:00.000000 +05:30' as timestamptz(6)),
'x')
+ );
+ """
+
+ // TIMESTAMPTZ as map value, hit
+ qt_table_value_hit """
+ SELECT id, map_contains_entry(map_s_tz, 'a', cast('2024-01-01
00:00:00.000000 +00:00' as timestamptz(6)))
+ FROM test_timestamptz_map_contains_entry_t
+ ORDER BY id;
+ """
+
+ // TIMESTAMPTZ as map value, miss
+ qt_table_value_miss """
+ SELECT id, map_contains_entry(map_s_tz, 'a', cast('2024-01-02
03:04:05.123456 +00:00' as timestamptz(6)))
+ FROM test_timestamptz_map_contains_entry_t
+ ORDER BY id;
+ """
+
+ // TIMESTAMPTZ as map key, hit
+ qt_table_key_hit """
+ SELECT id, map_contains_entry(map_tz_s, cast('2024-01-01
00:00:00.000000 +00:00' as timestamptz(6)), 'a')
+ FROM test_timestamptz_map_contains_entry_t
+ ORDER BY id;
+ """
+
+ // TIMESTAMPTZ as map key, miss
+ qt_table_key_miss """
+ SELECT id, map_contains_entry(map_tz_s, cast('2024-01-01
00:00:00.000000 +00:00' as timestamptz(6)), 'b')
+ FROM test_timestamptz_map_contains_entry_t
+ ORDER BY id;
+ """
+
+ // NULL search key
+ qt_null_search_key """
+ SELECT id, map_contains_entry(map_s_tz, NULL, cast('2024-01-01
00:00:00.000000 +00:00' as timestamptz(6)))
+ FROM test_timestamptz_map_contains_entry_t
+ ORDER BY id;
+ """
+
+ // NULL search value
+ qt_null_search_value """
+ SELECT id, map_contains_entry(map_s_tz, 'a', cast(NULL as
timestamptz(6)))
+ FROM test_timestamptz_map_contains_entry_t
+ ORDER BY id;
+ """
+}
diff --git
a/regression-test/suites/nereids_function_p0/scalar_function/U.groovy
b/regression-test/suites/nereids_function_p0/scalar_function/U.groovy
index 68642fa31ec..f43bc4cb6ee 100644
--- a/regression-test/suites/nereids_function_p0/scalar_function/U.groovy
+++ b/regression-test/suites/nereids_function_p0/scalar_function/U.groovy
@@ -62,6 +62,8 @@ suite("nereids_scalar_fn_U") {
def result = sql """select uniform(1, 100, random()*10000) from
numbers("number" = "10");"""
assertTrue(result.size() == 10)
+ def doubleResult = sql """select uniform(1.23, 100.100, random()*10000)
from numbers("number" = "10");"""
+ assertTrue(doubleResult.size() == 10)
test {
sql """select uniform(100, 1, random()*10000) from
numbers("number" = "10");"""
exception "uniform's min should be less than max"
diff --git a/regression-test/suites/nereids_syntax_p0/test_timestampdiff.groovy
b/regression-test/suites/nereids_syntax_p0/test_timestampdiff.groovy
index 34500732e22..0a3e563bd7f 100644
--- a/regression-test/suites/nereids_syntax_p0/test_timestampdiff.groovy
+++ b/regression-test/suites/nereids_syntax_p0/test_timestampdiff.groovy
@@ -37,4 +37,32 @@ suite("test_timestampdiff") {
qt_select """
SELECT TIMESTAMPDIFF(second,'2003-02-03 11:00:00','2003-02-03
11:00:40');
"""
+
+ qt_select """
+ SELECT TIMESTAMPDIFF(microsecond,
+ CAST('2024-01-01 10:00:00.123456' AS DATETIMEV2(6)),
+ CAST('2024-01-01 10:00:00.999999' AS DATETIMEV2(6)));
+ """
+
+ sql """drop table if exists test_timestampdiff_microsecond"""
+ sql """
+ create table test_timestampdiff_microsecond (
+ id int,
+ t datetimev2(6)
+ )
+ duplicate key(id)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+
+ sql """
+ insert into test_timestampdiff_microsecond values
+ (1, '2024-01-01 10:00:00.123456'),
+ (2, '2024-01-01 10:00:00.999999');
+ """
+
+ qt_select """
+ SELECT MAX(t), MIN(t), TIMESTAMPDIFF(MICROSECOND, MIN(t), MAX(t))
+ FROM test_timestampdiff_microsecond;
+ """
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]