This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8930df3b31d [Feature](iceberg-writer) Implements iceberg partition
transform. (#37692)
8930df3b31d is described below
commit 8930df3b31d25a4b2c85e3661c1afe520e74c7df
Author: Qi Chen <[email protected]>
AuthorDate: Sat Jul 13 16:07:50 2024 +0800
[Feature](iceberg-writer) Implements iceberg partition transform. (#37692)
## Proposed changes
Cherry-pick iceberg partition transform functionality. #36289 #36889
---------
Co-authored-by: kang <[email protected]>
Co-authored-by: lik40 <[email protected]>
Co-authored-by: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Mingyu Chen <[email protected]>
---
be/src/util/bit_util.h | 22 +
.../sink/writer/iceberg/partition_transformers.cpp | 172 ++-
.../sink/writer/iceberg/partition_transformers.h | 1274 +++++++++++++++++++-
.../sink/writer/iceberg/viceberg_table_writer.cpp | 22 +-
.../writer/iceberg/partition_transformers_test.cpp | 489 ++++++++
.../apache/doris/common/info/SimpleTableInfo.java | 66 +
.../datasource/iceberg/IcebergMetadataCache.java | 19 +-
.../datasource/iceberg/IcebergMetadataOps.java | 4 +
.../datasource/iceberg/IcebergTransaction.java | 211 ++--
.../doris/datasource/iceberg/IcebergUtils.java | 64 +-
.../iceberg/helper/IcebergWriterHelper.java | 91 ++
.../iceberg/source/IcebergApiSource.java | 2 +-
.../iceberg/source/IcebergHMSSource.java | 4 +-
.../datasource/statistics/CommonStatistics.java | 81 ++
.../commands/insert/IcebergInsertExecutor.java | 28 +-
.../org/apache/doris/planner/IcebergTableSink.java | 2 +-
.../transaction/IcebergTransactionManager.java | 7 +-
.../datasource/iceberg/IcebergTransactionTest.java | 139 ++-
18 files changed, 2468 insertions(+), 229 deletions(-)
diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index 230134ade09..6934f45ef3e 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -98,6 +98,28 @@ public:
return (v << n) >> n;
}
+ template <typename T>
+ static std::string IntToByteBuffer(T input) {
+ std::string buffer;
+ T value = input;
+ for (int i = 0; i < sizeof(value); ++i) {
+ // Applies a mask for a byte range on the input.
+ char value_to_save = value & 0XFF;
+ buffer.push_back(value_to_save);
+ // Remove the just processed part from the input so that we can
exit early if there
+ // is nothing left to process.
+ value >>= 8;
+ if (value == 0 && value_to_save >= 0) {
+ break;
+ }
+ if (value == -1 && value_to_save < 0) {
+ break;
+ }
+ }
+ std::reverse(buffer.begin(), buffer.end());
+ return buffer;
+ }
+
// Returns ceil(log2(x)).
// TODO: this could be faster if we use __builtin_clz. Fix this if this
ever shows up
// in a hot path.
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
index 0faebea6295..ee8268d30f7 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
@@ -25,31 +25,109 @@
namespace doris {
namespace vectorized {
+const std::chrono::sys_days PartitionColumnTransformUtils::EPOCH =
std::chrono::sys_days(
+ std::chrono::year {1970} / std::chrono::January / std::chrono::day
{1});
+
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
const doris::iceberg::PartitionField& field, const TypeDescriptor&
source_type) {
auto& transform = field.transform();
- static const std::regex hasWidth(R"((\w+)\[(\d+)\])");
+ static const std::regex has_width(R"((\w+)\[(\d+)\])");
std::smatch width_match;
- if (std::regex_match(transform, width_match, hasWidth)) {
+ if (std::regex_match(transform, width_match, has_width)) {
std::string name = width_match[1];
- //int parsed_width = std::stoi(width_match[2]);
+ int parsed_width = std::stoi(width_match[2]);
if (name == "truncate") {
switch (source_type.type) {
+ case TYPE_INT: {
+ return
std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_BIGINT: {
+ return
std::make_unique<BigintTruncatePartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ return
std::make_unique<StringTruncatePartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DECIMALV2: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL32: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL64: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL128I: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL256: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>(
+ source_type, parsed_width);
+ }
default: {
- throw doris::Exception(
- doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported type for truncate partition column
transform {}",
- source_type.debug_string());
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition
column transform {}",
+ source_type.debug_string(), transform);
}
}
} else if (name == "bucket") {
switch (source_type.type) {
+ case TYPE_INT: {
+ return
std::make_unique<IntBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_BIGINT: {
+ return
std::make_unique<BigintBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ return
std::make_unique<StringBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DECIMALV2: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL32: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL64: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL128I: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL256: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>(
+ source_type, parsed_width);
+ }
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported type for bucket partition
column transform {}",
- source_type.debug_string());
+ "Unsupported type {} for partition
column transform {}",
+ source_type.debug_string(), transform);
}
}
}
@@ -57,14 +135,79 @@ std::unique_ptr<PartitionColumnTransform>
PartitionColumnTransforms::create(
if (transform == "identity") {
return std::make_unique<IdentityPartitionColumnTransform>(source_type);
+ } else if (transform == "year") {
+ switch (source_type.type) {
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateYearPartitionColumnTransform>(source_type);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampYearPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "month") {
+ switch (source_type.type) {
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateMonthPartitionColumnTransform>(source_type);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampMonthPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "day") {
+ switch (source_type.type) {
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateDayPartitionColumnTransform>(source_type);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampDayPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "hour") {
+ switch (source_type.type) {
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampHourPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "void") {
+ return std::make_unique<VoidPartitionColumnTransform>(source_type);
} else {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported partition column transform: {}.",
transform);
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
}
}
+std::string PartitionColumnTransform::name() const {
+ return "default";
+}
+
std::string PartitionColumnTransform::to_human_string(const TypeDescriptor&
type,
const std::any& value)
const {
+ return get_partition_value(type, value);
+}
+
+std::string PartitionColumnTransform::get_partition_value(const
TypeDescriptor& type,
+ const std::any&
value) const {
if (value.has_value()) {
switch (type.type) {
case TYPE_BOOLEAN: {
@@ -131,19 +274,12 @@ std::string
PartitionColumnTransform::to_human_string(const TypeDescriptor& type
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported partition column transform:
{}",
- type.debug_string());
+ "Unsupported type {} for partition",
type.debug_string());
}
}
}
return "null";
}
-ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block,
int idx) {
- const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(idx);
- return {column_with_type_and_name.column, column_with_type_and_name.type,
- column_with_type_and_name.name};
-}
-
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h
b/be/src/vec/sink/writer/iceberg/partition_transformers.h
index 13c6238d1db..304c5834844 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.h
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h
@@ -45,25 +45,77 @@ public:
const doris::iceberg::PartitionField& field, const TypeDescriptor&
source_type);
};
+class PartitionColumnTransformUtils {
+public:
+ static DateV2Value<DateV2ValueType>& epoch_date() {
+ static DateV2Value<DateV2ValueType> epoch_date;
+ static bool initialized = false;
+ if (!initialized) {
+ epoch_date.from_date_str("1970-01-01 00:00:00", 19);
+ initialized = true;
+ }
+ return epoch_date;
+ }
+
+ static DateV2Value<DateTimeV2ValueType>& epoch_datetime() {
+ static DateV2Value<DateTimeV2ValueType> epoch_datetime;
+ static bool initialized = false;
+ if (!initialized) {
+ epoch_datetime.from_date_str("1970-01-01 00:00:00", 19);
+ initialized = true;
+ }
+ return epoch_datetime;
+ }
+
+ static std::string human_year(int year_ordinal) {
+ auto ymd = std::chrono::year_month_day {EPOCH} +
std::chrono::years(year_ordinal);
+ return std::to_string(static_cast<int>(ymd.year()));
+ }
+
+ static std::string human_month(int month_ordinal) {
+ auto ymd = std::chrono::year_month_day {EPOCH} +
std::chrono::months(month_ordinal);
+ return fmt::format("{:04d}-{:02d}", static_cast<int>(ymd.year()),
+ static_cast<unsigned>(ymd.month()));
+ }
+
+ static std::string human_day(int day_ordinal) {
+ auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+ std::chrono::floor<std::chrono::days>(EPOCH +
std::chrono::days(day_ordinal))));
+ return fmt::format("{:04d}-{:02d}-{:02d}",
static_cast<int>(ymd.year()),
+ static_cast<unsigned>(ymd.month()),
static_cast<unsigned>(ymd.day()));
+ }
+
+ static std::string human_hour(int hour_ordinal) {
+ int day_value = hour_ordinal / 24;
+ int housr_value = hour_ordinal % 24;
+ auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+ std::chrono::floor<std::chrono::days>(EPOCH +
std::chrono::days(day_value))));
+ return fmt::format("{:04d}-{:02d}-{:02d}-{:02d}",
static_cast<int>(ymd.year()),
+ static_cast<unsigned>(ymd.month()),
static_cast<unsigned>(ymd.day()),
+ housr_value);
+ }
+
+private:
+ static const std::chrono::sys_days EPOCH;
+ PartitionColumnTransformUtils() = default;
+};
+
class PartitionColumnTransform {
public:
PartitionColumnTransform() = default;
virtual ~PartitionColumnTransform() = default;
- virtual bool preserves_non_null() const { return false; }
-
- virtual bool monotonic() const { return true; }
-
- virtual bool temporal() const { return false; }
+ virtual std::string name() const;
virtual const TypeDescriptor& get_result_type() const = 0;
- virtual bool is_void() const { return false; }
-
- virtual ColumnWithTypeAndName apply(Block& block, int idx) = 0;
+ virtual ColumnWithTypeAndName apply(const Block& block, int column_pos) =
0;
virtual std::string to_human_string(const TypeDescriptor& type, const
std::any& value) const;
+
+ virtual std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const;
};
class IdentityPartitionColumnTransform : public PartitionColumnTransform {
@@ -71,12 +123,1214 @@ public:
IdentityPartitionColumnTransform(const TypeDescriptor& source_type)
: _source_type(source_type) {}
- virtual const TypeDescriptor& get_result_type() const { return
_source_type; }
+ std::string name() const override { return "Identity"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ return {column_with_type_and_name.column,
column_with_type_and_name.type,
+ column_with_type_and_name.name};
+ }
+
+private:
+ TypeDescriptor _source_type;
+};
+
+class StringTruncatePartitionColumnTransform : public PartitionColumnTransform
{
+public:
+ StringTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "StringTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ static_cast<void>(_width);
+ auto int_type = std::make_shared<DataTypeInt32>();
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+
+ ColumnPtr string_column_ptr;
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (auto* nullable_column =
+
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ string_column_ptr = nullable_column->get_nested_column_ptr();
+ is_nullable = true;
+ } else {
+ string_column_ptr = column_with_type_and_name.column;
+ is_nullable = false;
+ }
+
+ // Create a temp_block to execute substring function.
+ Block temp_block;
+ temp_block.insert(column_with_type_and_name);
+ temp_block.insert({int_type->create_column_const(temp_block.rows(),
to_field(1)), int_type,
+ "const 1"});
+ temp_block.insert({int_type->create_column_const(temp_block.rows(),
to_field(_width)),
+ int_type, fmt::format("const {}", _width)});
+ temp_block.insert({nullptr, std::make_shared<DataTypeString>(),
"result"});
+ ColumnNumbers temp_arguments(3);
+ temp_arguments[0] = 0; // str column
+ temp_arguments[1] = 1; // pos
+ temp_arguments[2] = 2; // width
+ size_t result_column_id = 3;
+
+ SubstringUtil::substring_execute(temp_block, temp_arguments,
result_column_id,
+ temp_block.rows());
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(
+ temp_block.get_by_position(result_column_id).column,
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ auto res_column =
temp_block.get_by_position(result_column_id).column;
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+class IntegerTruncatePartitionColumnTransform : public
PartitionColumnTransform {
+public:
+ IntegerTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "IntegerTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt32*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const int* end_in = in_data.data() + in_data.size();
+ const Int32* __restrict p_in = in_data.data();
+ Int32* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {res_column,
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+class BigintTruncatePartitionColumnTransform : public PartitionColumnTransform
{
+public:
+ BigintTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "BigintTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt64*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt64::create();
+ ColumnInt64::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const Int64* end_in = in_data.data() + in_data.size();
+ const Int64* __restrict p_in = in_data.data();
+ Int64* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {res_column,
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+template <typename T>
+class DecimalTruncatePartitionColumnTransform : public
PartitionColumnTransform {
+public:
+ DecimalTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "DecimalTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+
+ ColumnPtr column_ptr;
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (auto* nullable_column =
+
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ is_nullable = true;
+ } else {
+ column_ptr = column_with_type_and_name.column;
+ is_nullable = false;
+ }
+
+ const auto* const decimal_col =
check_and_get_column<ColumnDecimal<T>>(column_ptr);
+ const auto& vec_src = decimal_col->get_data();
+
+ auto col_res = ColumnDecimal<T>::create(vec_src.size(),
decimal_col->get_scale());
+ auto& vec_res = col_res->get_data();
+
+ const typename T::NativeType* __restrict p_in =
+ reinterpret_cast<const T::NativeType*>(vec_src.data());
+ const typename T::NativeType* end_in =
+ reinterpret_cast<const T::NativeType*>(vec_src.data()) +
vec_src.size();
+ typename T::NativeType* __restrict p_out =
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+ while (p_in < end_in) {
+ typename T::NativeType remainder = ((*p_in % _width) + _width) %
_width;
+ *p_out = *p_in - remainder;
+ ++p_in;
+ ++p_out;
+ }
+
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {res_column,
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+class IntBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ IntBucketPartitionColumnTransform(const TypeDescriptor& source_type, int
bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "IntBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt32*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const int* end_in = in_data.data() + in_data.size();
+ const Int32* __restrict p_in = in_data.data();
+ Int32* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ Int64 long_value = static_cast<Int64>(*p_in);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+ // *p_out = ((hash_value >> 1) & INT32_MAX) %
_bucket_num;
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class BigintBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ BigintBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "BigintBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt64*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const Int64* end_in = in_data.data() + in_data.size();
+ const Int64* __restrict p_in = in_data.data();
+ Int32* __restrict p_out = out_data.data();
+ while (p_in < end_in) {
+ Int64 long_value = static_cast<Int64>(*p_in);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+ // int value = ((hash_value >> 1) & INT32_MAX) %
_bucket_num;
+ int value = (hash_value & INT32_MAX) % _bucket_num;
+ *p_out = value;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+template <typename T>
+class DecimalBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DecimalBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DecimalBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDecimal<T>*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ auto& vec_res = col_res->get_data();
+
+ const typename T::NativeType* __restrict p_in =
+ reinterpret_cast<const T::NativeType*>(in_data.data());
+ const typename T::NativeType* end_in =
+ reinterpret_cast<const T::NativeType*>(in_data.data()) +
in_data.size();
+ typename T::NativeType* __restrict p_out =
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+ while (p_in < end_in) {
+ std::string buffer = BitUtil::IntToByteBuffer(*p_in);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(buffer.data(),
buffer.size(), 0);
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ return get_partition_value(type, value);
+ }
+
+ std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const override {
+ if (value.has_value()) {
+ return std::to_string(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class DateBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateBucketPartitionColumnTransform(const TypeDescriptor& source_type, int
bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const auto* end_in = in_data.data() + in_data.size();
+
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+
+ int32_t days_from_unix_epoch = value.daynr() - 719528;
+ Int64 long_value = static_cast<Int64>(days_from_unix_epoch);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class TimestampBucketPartitionColumnTransform : public
PartitionColumnTransform {
+public:
+ TimestampBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const auto* end_in = in_data.data() + in_data.size();
+
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+
+ int64_t timestamp;
+ if (!value.unix_timestamp(×tamp, "UTC")) {
+ LOG(WARNING) << "Failed to call unix_timestamp :" <<
value.debug_string();
+ timestamp = 0;
+ }
+ Int64 long_value = static_cast<Int64>(timestamp) * 1000000;
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return std::to_string(std::any_cast<Int32>(value));
+ ;
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class StringBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ StringBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "StringBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto* str_col = assert_cast<const
ColumnString*>(column_ptr.get());
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ const auto& data = str_col->get_chars();
+ const auto& offsets = str_col->get_offsets();
+
+ size_t offset_size = offsets.size();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(offset_size);
+ auto* __restrict p_out = out_data.data();
+
+ for (int i = 0; i < offset_size; i++) {
+ const unsigned char* raw_str = &data[offsets[i - 1]];
+ ColumnString::Offset size = offsets[i] - offsets[i - 1];
+ uint32_t hash_value = HashUtil::murmur_hash3_32(raw_str, size, 0);
+
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class DateYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateYearPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateYear"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+ *p_out =
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ TimestampYearPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampYear"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class DateMonthPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateMonth"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+ *p_out =
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform
{
+public:
+ TimestampMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampMonth"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class DateDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateDayPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateDay"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+ *p_out =
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ return get_partition_value(type, value);
+ }
+
+ std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const override {
+ if (value.has_value()) {
+ int day_value = std::any_cast<Int32>(value);
+ return PartitionColumnTransformUtils::human_day(day_value);
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ TimestampDayPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampDay"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ return get_partition_value(type, value);
+ }
+
+ std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_day(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampHourPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ TimestampHourPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampHour"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_hour(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class VoidPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ VoidPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(source_type) {}
+
+ std::string name() const override { return "Void"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
- virtual ColumnWithTypeAndName apply(Block& block, int idx);
+ ColumnPtr column_ptr;
+ ColumnPtr null_map_column_ptr;
+ if (auto* nullable_column =
+
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ } else {
+ column_ptr = column_with_type_and_name.column;
+ }
+ auto res_column = ColumnNullable::create(std::move(column_ptr),
+
ColumnUInt8::create(column_ptr->size(), 1));
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ }
private:
TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
};
} // namespace vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 2703330406c..e59b0593f7b 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -329,8 +329,8 @@ std::vector<std::string>
VIcebergTableWriter::_partition_values(
TypeDescriptor result_type =
iceberg_partition_column.partition_column_transform().get_result_type();
partition_values.emplace_back(
-
iceberg_partition_column.partition_column_transform().to_human_string(result_type,
-
data.get(i)));
+
iceberg_partition_column.partition_column_transform().get_partition_value(
+ result_type, data.get(i)));
}
return partition_values;
@@ -407,21 +407,25 @@ std::optional<PartitionData>
VIcebergTableWriter::_get_partition_data(
std::any VIcebergTableWriter::_get_iceberg_partition_value(
const TypeDescriptor& type_desc, const ColumnWithTypeAndName&
partition_column,
int position) {
- ColumnPtr column;
- if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*partition_column.column)) {
+ //1) get the partition column ptr
+ ColumnPtr col_ptr =
partition_column.column->convert_to_full_column_if_const();
+ CHECK(col_ptr != nullptr);
+ if (col_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(col_ptr.get());
auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
if (null_map_data[position]) {
return std::any();
}
- column = nullable_column->get_nested_column_ptr();
- } else {
- column = partition_column.column;
+ col_ptr = nullable_column->get_nested_column_ptr();
}
- auto [item, size] = column->get_data_at(position);
+
+ //2) get parition field data from paritionblock
+ auto [item, size] = col_ptr->get_data_at(position);
switch (type_desc.type) {
case TYPE_BOOLEAN: {
vectorized::Field field =
- vectorized::check_and_get_column<const
ColumnUInt8>(*column)->operator[](position);
+ vectorized::check_and_get_column<const
ColumnUInt8>(*col_ptr)->operator[](position);
return field.get<bool>();
}
case TYPE_TINYINT: {
diff --git a/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp
b/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp
new file mode 100644
index 00000000000..a8df4f60d83
--- /dev/null
+++ b/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp
@@ -0,0 +1,489 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/partition_transformers.h"
+
+#include <gtest/gtest.h>
+
+#include "vec/data_types/data_type_time_v2.h"
+
+namespace doris::vectorized {
+
+class PartitionTransformersTest : public testing::Test {
+public:
+ PartitionTransformersTest() = default;
+ virtual ~PartitionTransformersTest() = default;
+};
+
+TEST_F(PartitionTransformersTest, test_integer_truncate_transform) {
+ const std::vector<int32_t> values({1, -1});
+ auto column = ColumnInt32::create();
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_int(column->get_ptr(),
std::make_shared<DataTypeInt32>(),
+ "test_int");
+
+ Block block({test_int});
+ TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+ IntegerTruncatePartitionColumnTransform transform(source_type, 10);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {0, -10};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_bigint_truncate_transform) {
+ const std::vector<int64_t> values({1, -1});
+ auto column = ColumnInt64::create();
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_bigint(column->get_ptr(),
std::make_shared<DataTypeInt64>(),
+ "test_bigint");
+
+ Block block({test_bigint});
+ TypeDescriptor source_type(PrimitiveType::TYPE_BIGINT);
+ BigintTruncatePartitionColumnTransform transform(source_type, 10);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt64*>(result.column.get())->get_data();
+ std::vector<int64_t> expected_data = {0, -10};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_decimal32_truncate_transform) {
+ const std::vector<int32_t> values({1065});
+ auto column = ColumnDecimal32::create(0, 2);
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_decimal32(column->get_ptr(),
+
std::make_shared<DataTypeDecimal<Decimal32>>(4, 2),
+ "test_decimal32");
+
+ Block block({test_decimal32});
+ TypeDescriptor source_type = TypeDescriptor::create_decimalv3_type(4, 2);
+ DecimalTruncatePartitionColumnTransform<Decimal32> transform(source_type,
50);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnDecimal32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {1050};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i].value);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_string_truncate_transform) {
+ const std::vector<StringRef> values({{"iceberg", sizeof("iceberg") - 1}});
+ auto column = ColumnString::create();
+ column->insert_many_strings(&values[0], values.size());
+ ColumnWithTypeAndName test_string(column->get_ptr(),
std::make_shared<DataTypeString>(),
+ "test_string");
+
+ Block block({test_string});
+ TypeDescriptor source_type = TypeDescriptor::create_string_type();
+ StringTruncatePartitionColumnTransform transform(source_type, 3);
+
+ auto result = transform.apply(block, 0);
+ const auto result_column = assert_cast<const
ColumnString*>(result.column.get());
+ const char result_data[] = {'i', 'c', 'e'};
+ std::vector<StringRef> expected_data = {
+ {result_data, sizeof(result_data) / sizeof(result_data[0])}};
+ EXPECT_EQ(expected_data.size(), result_column->size());
+ for (size_t i = 0; i < result_column->size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_column->get_data_at(i));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_integer_bucket_transform) {
+ const std::vector<int32_t> values({34, -123}); // 2017239379, -471378254
+ auto column = ColumnInt32::create();
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_int(column->get_ptr(),
std::make_shared<DataTypeInt32>(),
+ "test_int");
+
+ Block block({test_int});
+ TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+ IntBucketPartitionColumnTransform transform(source_type, 16);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {3, 2};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_bigint_bucket_transform) {
+ const std::vector<int64_t> values({34, -123}); // 2017239379, -471378254
+ auto column = ColumnInt64::create();
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_bigint(column->get_ptr(),
std::make_shared<DataTypeInt64>(),
+ "test_bigint");
+
+ Block block({test_bigint});
+ TypeDescriptor source_type(PrimitiveType::TYPE_BIGINT);
+ BigintBucketPartitionColumnTransform transform(source_type, 16);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {3, 2};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_decimal32_bucket_transform) {
+ const std::vector<int32_t> values({1420}); // -500754589
+ auto column = ColumnDecimal32::create(0, 2);
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_decimal32(column->get_ptr(),
+
std::make_shared<DataTypeDecimal<Decimal32>>(4, 2),
+ "test_decimal32");
+
+ Block block({test_decimal32});
+ TypeDescriptor source_type = TypeDescriptor::create_decimalv3_type(4, 2);
+ DecimalBucketPartitionColumnTransform<Decimal32> transform(source_type,
16);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {3};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_date_bucket_transform) {
+ auto column = ColumnDateV2::create();
+ auto& date_v2_data = column->get_data();
+ DateV2Value<DateV2ValueType> value;
+ value.set_time(2017, 11, 16, 0, 0, 0, 0); // -653330422
+ date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ ColumnWithTypeAndName test_date(column->get_ptr(),
std::make_shared<DataTypeDateV2>(),
+ "test_date");
+
+ Block block({test_date});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+ DateBucketPartitionColumnTransform transform(source_type, 16);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {10};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_bucket_transform) {
+ auto column = ColumnDateTimeV2::create();
+ auto& datetime_v2_data = column->get_data();
+ DateV2Value<DateTimeV2ValueType> value;
+ value.set_time(2017, 11, 16, 22, 31, 8, 0); // -2047944441
+ datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+ ColumnWithTypeAndName test_timestamp(column->get_ptr(),
std::make_shared<DataTypeDateTimeV2>(),
+ "test_timestamp");
+
+ Block block({test_timestamp});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+ TimestampBucketPartitionColumnTransform transform(source_type, 16);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {7};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_string_bucket_transform) {
+ const std::vector<StringRef> values({{"iceberg", sizeof("iceberg") - 1}});
// 1210000089
+ auto column = ColumnString::create();
+ column->insert_many_strings(&values[0], values.size());
+ ColumnWithTypeAndName test_string(column->get_ptr(),
std::make_shared<DataTypeString>(),
+ "test_string");
+
+ Block block({test_string});
+ TypeDescriptor source_type(PrimitiveType::TYPE_STRING);
+ StringBucketPartitionColumnTransform transform(source_type, 16);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {9};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_date_year_transform) {
+ auto column = ColumnDateV2::create();
+ auto& date_v2_data = column->get_data();
+ DateV2Value<DateV2ValueType> value;
+ value.set_time(2017, 11, 16, 0, 0, 0, 0);
+ date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ ColumnWithTypeAndName test_date(column->get_ptr(),
std::make_shared<DataTypeDateV2>(),
+ "test_date");
+
+ Block block({test_date});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+ DateYearPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {47};
+ std::vector<std::string> expected_human_string = {"2017"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_year_transform) {
+ auto column = ColumnDateTimeV2::create();
+ auto& datetime_v2_data = column->get_data();
+ DateV2Value<DateTimeV2ValueType> value;
+ value.set_time(2017, 11, 16, 22, 31, 8, 0);
+ datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+ ColumnWithTypeAndName test_timestamp(column->get_ptr(),
std::make_shared<DataTypeDateTimeV2>(),
+ "test_timestamp");
+
+ Block block({test_timestamp});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+ TimestampYearPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {47};
+ std::vector<std::string> expected_human_string = {"2017"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_date_month_transform) {
+ auto column = ColumnDateV2::create();
+ auto& date_v2_data = column->get_data();
+ DateV2Value<DateV2ValueType> value;
+ value.set_time(2017, 11, 16, 0, 0, 0, 0);
+ date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ ColumnWithTypeAndName test_date(column->get_ptr(),
std::make_shared<DataTypeDateV2>(),
+ "test_date");
+
+ Block block({test_date});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+ DateMonthPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {574};
+ std::vector<std::string> expected_human_string = {"2017-11"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_month_transform) {
+ auto column = ColumnDateTimeV2::create();
+ auto& datetime_v2_data = column->get_data();
+ DateV2Value<DateTimeV2ValueType> value;
+ value.set_time(2017, 11, 16, 22, 31, 8, 0);
+ datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+ ColumnWithTypeAndName test_timestamp(column->get_ptr(),
std::make_shared<DataTypeDateTimeV2>(),
+ "test_timestamp");
+
+ Block block({test_timestamp});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+ TimestampMonthPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {574};
+ std::vector<std::string> expected_human_string = {"2017-11"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_date_day_transform) {
+ auto column = ColumnDateV2::create();
+ auto& date_v2_data = column->get_data();
+ DateV2Value<DateV2ValueType> value;
+ value.set_time(2017, 11, 16, 0, 0, 0, 0);
+ date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+ ColumnWithTypeAndName test_date(column->get_ptr(),
std::make_shared<DataTypeDateV2>(),
+ "test_date");
+
+ Block block({test_date});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+ DateDayPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {17486};
+ std::vector<std::string> expected_human_string = {"2017-11-16"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_day_transform) {
+ auto column = ColumnDateTimeV2::create();
+ auto& datetime_v2_data = column->get_data();
+ DateV2Value<DateTimeV2ValueType> value;
+ value.set_time(2017, 11, 16, 22, 31, 8, 0);
+ datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+ ColumnWithTypeAndName test_timestamp(column->get_ptr(),
std::make_shared<DataTypeDateTimeV2>(),
+ "test_timestamp");
+
+ Block block({test_timestamp});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+ TimestampDayPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {17486};
+ std::vector<std::string> expected_human_string = {"2017-11-16"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_hour_transform) {
+ auto column = ColumnDateTimeV2::create();
+ auto& datetime_v2_data = column->get_data();
+ DateV2Value<DateTimeV2ValueType> value;
+ value.set_time(2017, 11, 16, 22, 31, 8, 0);
+ datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+ ColumnWithTypeAndName test_timestamp(column->get_ptr(),
std::make_shared<DataTypeDateTimeV2>(),
+ "test_timestamp");
+
+ Block block({test_timestamp});
+ TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+ TimestampHourPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_data = assert_cast<const
ColumnInt32*>(result.column.get())->get_data();
+ std::vector<int32_t> expected_data = {419686};
+ std::vector<std::string> expected_human_string = {"2017-11-16-22"};
+ EXPECT_EQ(expected_data.size(), result_data.size());
+ for (size_t i = 0; i < result_data.size(); ++i) {
+ EXPECT_EQ(expected_data[i], result_data[i]);
+ EXPECT_EQ(expected_human_string[i],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ }
+}
+
+TEST_F(PartitionTransformersTest, test_void_transform) {
+ const std::vector<int32_t> values({1, -1});
+ auto column = ColumnInt32::create();
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_int(column->get_ptr(),
std::make_shared<DataTypeInt32>(),
+ "test_int");
+
+ Block block({test_int});
+ TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+ VoidPartitionColumnTransform transform(source_type);
+
+ auto result = transform.apply(block, 0);
+
+ const auto& result_null_map_data =
+ assert_cast<const
ColumnNullable*>(result.column.get())->get_null_map_data();
+
+ for (size_t i = 0; i < result_null_map_data.size(); ++i) {
+ EXPECT_EQ(1, result_null_map_data[i]);
+ }
+}
+
+TEST_F(PartitionTransformersTest,
test_nullable_column_integer_truncate_transform) {
+ const std::vector<int32_t> values({1, -1});
+ auto column = ColumnNullable::create(ColumnInt32::create(),
ColumnUInt8::create());
+ column->insert_data(nullptr, 0);
+ column->insert_many_fix_len_data(reinterpret_cast<const
char*>(values.data()), values.size());
+ ColumnWithTypeAndName test_int(
+ column->get_ptr(),
+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()),
"test_int");
+
+ Block block({test_int});
+ TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+ IntegerTruncatePartitionColumnTransform transform(source_type, 10);
+
+ auto result = transform.apply(block, 0);
+
+ std::vector<int32_t> expected_data = {0, -10};
+ std::vector<std::string> expected_human_string = {"0", "-10"};
+ const auto* result_column = assert_cast<const
ColumnNullable*>(result.column.get());
+ const auto& result_data =
+ assert_cast<const
ColumnInt32*>(result_column->get_nested_column_ptr().get())
+ ->get_data();
+ const auto& null_map_column = result_column->get_null_map_column();
+
+ EXPECT_EQ(1, null_map_column[0]);
+ EXPECT_EQ(0, null_map_column[1]);
+ EXPECT_EQ(0, null_map_column[2]);
+
+ for (size_t i = 0, j = 0; i < result_column->size(); ++i) {
+ if (null_map_column[i] == 0) {
+ EXPECT_EQ(expected_data[j], result_data[i]);
+ EXPECT_EQ(expected_human_string[j],
+ transform.to_human_string(transform.get_result_type(),
result_data[i]));
+ ++j;
+ }
+ }
+}
+
+} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
new file mode 100644
index 00000000000..6fdb27e1d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java
+// and modified by Doris
+
+package org.apache.doris.common.info;
+
+import java.util.Objects;
+
+public class SimpleTableInfo {
+
+ private final String dbName;
+ private final String tbName;
+
+ public SimpleTableInfo(String dbName, String tbName) {
+ this.dbName = dbName;
+ this.tbName = tbName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTbName() {
+ return tbName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tbName);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ SimpleTableInfo that = (SimpleTableInfo) other;
+ return Objects.equals(dbName, that.dbName) && Objects.equals(tbName,
that.tbName);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s.%s", dbName, tbName);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index acda08b7378..68064c4e439 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
@@ -85,6 +86,20 @@ public class IcebergMetadataCache {
return tableCache.get(key);
}
+ public Table getAndCloneTable(CatalogIf catalog, String dbName, String
tbName) {
+ Table restTable;
+ synchronized (this) {
+ Table table = getIcebergTable(catalog, dbName, tbName);
+ restTable = SerializableTable.copyOf(table);
+ }
+ return restTable;
+ }
+
+ public Table getRemoteTable(CatalogIf catalog, String dbName, String
tbName) {
+ IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog,
dbName, tbName);
+ return loadTable(key);
+ }
+
@NotNull
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
Table icebergTable = getIcebergTable(key.catalog, key.dbName,
key.tableName);
@@ -116,7 +131,7 @@ public class IcebergMetadataCache {
public void invalidateCatalogCache(long catalogId) {
snapshotListCache.asMap().keySet().stream()
.filter(key -> key.catalog.getId() == catalogId)
- .forEach(snapshotListCache::invalidate);
+ .forEach(snapshotListCache::invalidate);
tableCache.asMap().entrySet().stream()
.filter(entry -> entry.getKey().catalog.getId() == catalogId)
@@ -130,7 +145,7 @@ public class IcebergMetadataCache {
snapshotListCache.asMap().keySet().stream()
.filter(key -> key.catalog.getId() == catalogId &&
key.dbName.equals(dbName) && key.tableName.equals(
tblName))
- .forEach(snapshotListCache::invalidate);
+ .forEach(snapshotListCache::invalidate);
tableCache.asMap().entrySet().stream()
.filter(entry -> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index c7fef68ee97..a6933f83d76 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -64,6 +64,10 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
return catalog;
}
+ public IcebergExternalCatalog getExternalCatalog() {
+ return dorisCatalog;
+ }
+
@Override
public void close() {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 5025e075142..a3a978ccd7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -21,31 +21,39 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TFileContent;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
+import
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TIcebergCommitData;
+import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;
-import com.google.common.base.VerifyException;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.Metrics;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.WriteResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
public class IcebergTransaction implements Transaction {
private static final Logger LOG =
LogManager.getLogger(IcebergTransaction.class);
+
private final IcebergMetadataOps ops;
+ private SimpleTableInfo tableInfo;
+ private Table table;
+
+
private org.apache.iceberg.Transaction transaction;
private final List<TIcebergCommitData> commitDataList =
Lists.newArrayList();
@@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction {
}
}
- public void beginInsert(String dbName, String tbName) {
- Table icebergTable =
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
- transaction = icebergTable.newTransaction();
+ public void beginInsert(SimpleTableInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ this.table = getNativeTable(tableInfo);
+ this.transaction = table.newTransaction();
}
- public void finishInsert() {
- Table icebergTable = transaction.table();
- AppendFiles appendFiles = transaction.newAppend();
-
- for (CommitTaskData task : convertToCommitTaskData()) {
- DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
- .withPath(task.getPath())
- .withFileSizeInBytes(task.getFileSizeInBytes())
- .withFormat(IcebergUtils.getFileFormat(icebergTable))
- .withMetrics(task.getMetrics());
-
- if (icebergTable.spec().isPartitioned()) {
- List<String> partitionValues = task.getPartitionValues()
- .orElseThrow(() -> new VerifyException("No partition
data for partitioned table"));
- builder.withPartitionValues(partitionValues);
- }
- appendFiles.appendFile(builder.build());
+ public void finishInsert(SimpleTableInfo tableInfo,
Optional<InsertCommandContext> insertCtx) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("iceberg table {} insert table finished!", tableInfo);
}
- // in appendFiles.commit, it will generate metadata(manifest and
snapshot)
- // after appendFiles.commit, in current transaction, you can already
see the new snapshot
- appendFiles.commit();
- }
-
- public List<CommitTaskData> convertToCommitTaskData() {
- List<CommitTaskData> commitTaskData = new ArrayList<>();
- for (TIcebergCommitData data : this.commitDataList) {
- commitTaskData.add(new CommitTaskData(
- data.getFilePath(),
- data.getFileSize(),
- new Metrics(
- data.getRowCount(),
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP
- ),
- data.isSetPartitionValues() ?
Optional.of(data.getPartitionValues()) : Optional.empty(),
- convertToFileContent(data.getFileContent()),
- data.isSetReferencedDataFiles() ?
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
- ));
+ //create and start the iceberg transaction
+ TUpdateMode updateMode = TUpdateMode.APPEND;
+ if (insertCtx.isPresent()) {
+ updateMode = ((BaseExternalTableInsertCommandContext)
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
+ : TUpdateMode.APPEND;
}
- return commitTaskData;
+ updateManifestAfterInsert(updateMode);
}
- private FileContent convertToFileContent(TFileContent content) {
- if (content.equals(TFileContent.DATA)) {
- return FileContent.DATA;
- } else if (content.equals(TFileContent.POSITION_DELETES)) {
- return FileContent.POSITION_DELETES;
+ private void updateManifestAfterInsert(TUpdateMode updateMode) {
+ PartitionSpec spec = table.spec();
+ FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
+ //convert commitDataList to writeResult
+ WriteResult writeResult = IcebergWriterHelper
+ .convertToWriterResult(fileFormat, spec, commitDataList);
+ List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
+
+ if (spec.isPartitioned()) {
+ partitionManifestUpdate(updateMode, table, pendingResults);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("{} {} table partition manifest successful and
writeResult : {}..", tableInfo, updateMode,
+ writeResult);
+ }
} else {
- return FileContent.EQUALITY_DELETES;
+ tableManifestUpdate(updateMode, table, pendingResults);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("{} {} table manifest successful and writeResult :
{}..", tableInfo, updateMode,
+ writeResult);
+ }
}
}
@Override
public void commit() throws UserException {
- // Externally readable
- // Manipulate the relevant data so that others can also see the latest
table, such as:
- // 1. hadoop: it will change the version number information in
'version-hint.text'
- // 2. hive: it will change the table properties, the most important
thing is to revise 'metadata_location'
- // 3. and so on ...
+ // commit the iceberg transaction
transaction.commitTransaction();
}
@Override
public void rollback() {
-
+ //do nothing
}
public long getUpdateCnt() {
return
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
}
- public static class CommitTaskData {
- private final String path;
- private final long fileSizeInBytes;
- private final Metrics metrics;
- private final Optional<List<String>> partitionValues;
- private final FileContent content;
- private final Optional<List<String>> referencedDataFiles;
-
- public CommitTaskData(String path,
- long fileSizeInBytes,
- Metrics metrics,
- Optional<List<String>> partitionValues,
- FileContent content,
- Optional<List<String>> referencedDataFiles) {
- this.path = path;
- this.fileSizeInBytes = fileSizeInBytes;
- this.metrics = metrics;
- this.partitionValues =
convertPartitionValuesForNull(partitionValues);
- this.content = content;
- this.referencedDataFiles = referencedDataFiles;
- }
- private Optional<List<String>>
convertPartitionValuesForNull(Optional<List<String>> partitionValues) {
- if (!partitionValues.isPresent()) {
- return partitionValues;
- }
- List<String> values = partitionValues.get();
- if (!values.contains("null")) {
- return partitionValues;
- }
- return Optional.of(values.stream().map(s -> s.equals("null") ?
null : s).collect(Collectors.toList()));
- }
+ private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
+ Objects.requireNonNull(tableInfo);
+ IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+ return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
+ }
- public String getPath() {
- return path;
+ private void partitionManifestUpdate(TUpdateMode updateMode, Table table,
List<WriteResult> pendingResults) {
+ if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+ LOG.warn("{} partitionManifestUp method call but pendingResults is
null or empty!", table.name());
+ return;
}
-
- public long getFileSizeInBytes() {
- return fileSizeInBytes;
+ // Commit the appendPartitionOperator transaction.
+ if (updateMode == TUpdateMode.APPEND) {
+ commitAppendTxn(table, pendingResults);
+ } else {
+ ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length
== 0,
+ "Should have no referenced data files.");
+
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+ }
+ appendPartitionOp.commit();
}
+ }
- public Metrics getMetrics() {
- return metrics;
+ private void tableManifestUpdate(TUpdateMode updateMode, Table table,
List<WriteResult> pendingResults) {
+ if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+ LOG.warn("{} tableManifestUp method call but pendingResults is
null or empty!", table.name());
+ return;
}
-
- public Optional<List<String>> getPartitionValues() {
- return partitionValues;
+ // Commit the appendPartitionOperator transaction.
+ if (LOG.isDebugEnabled()) {
+ LOG.info("{} tableManifestUp method call ", table.name());
}
-
- public FileContent getContent() {
- return content;
+ if (updateMode == TUpdateMode.APPEND) {
+ commitAppendTxn(table, pendingResults);
+ } else {
+ ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length
== 0,
+ "Should have no referenced data files.");
+
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+ }
+ appendPartitionOp.commit();
}
+ }
- public Optional<List<String>> getReferencedDataFiles() {
- return referencedDataFiles;
+
+ private void commitAppendTxn(Table table, List<WriteResult>
pendingResults) {
+ // To be compatible with iceberg format V1.
+ AppendFiles appendFiles = table.newAppend();
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length == 0,
+ "Should have no referenced data files for append.");
+ Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
+ appendFiles.commit();
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 2aa5dda35a4..512e6a3ee93 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
@@ -50,6 +51,7 @@ import
org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.collect.Lists;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -87,6 +89,8 @@ public class IcebergUtils {
// https://iceberg.apache.org/spec/#schemas-and-data-types
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
+ private static final String PARQUET_NAME = "parquet";
+ private static final String ORC_NAME = "orc";
public static final String TOTAL_RECORDS = "total-records";
public static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
@@ -522,8 +526,8 @@ public class IcebergUtils {
case MAP:
Types.MapType map = (Types.MapType) type;
return new MapType(
- icebergTypeToDorisType(map.keyType()),
- icebergTypeToDorisType(map.valueType())
+ icebergTypeToDorisType(map.keyType()),
+ icebergTypeToDorisType(map.valueType())
);
case STRUCT:
Types.StructType struct = (Types.StructType) type;
@@ -536,11 +540,30 @@ public class IcebergUtils {
}
}
+
public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog
catalog, String dbName, String tblName) {
+ return getIcebergTableInternal(catalog, dbName, tblName, false);
+ }
+
+ public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog
catalog, SimpleTableInfo tableInfo) {
+ return getIcebergTableInternal(catalog, tableInfo.getDbName(),
tableInfo.getTbName(), true);
+ }
+
+ public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog
catalog, SimpleTableInfo tableInfo) {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
- .getIcebergTable(catalog, dbName, tblName);
+ .getRemoteTable(catalog, tableInfo.getDbName(),
tableInfo.getTbName());
+ }
+
+ private static org.apache.iceberg.Table
getIcebergTableInternal(ExternalCatalog catalog, String dbName,
+ String tblName,
+ boolean isClone) {
+ IcebergMetadataCache metadataCache = Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergMetadataCache();
+ return isClone ? metadataCache.getAndCloneTable(catalog, dbName,
tblName)
+ : metadataCache.getIcebergTable(catalog, dbName, tblName);
}
/**
@@ -587,17 +610,27 @@ public class IcebergUtils {
return -1;
}
- public static String getFileFormat(Table table) {
- Map<String, String> properties = table.properties();
+
+ public static FileFormat getFileFormat(Table icebergTable) {
+ Map<String, String> properties = icebergTable.properties();
+ String fileFormatName;
if (properties.containsKey(WRITE_FORMAT)) {
- return properties.get(WRITE_FORMAT);
+ fileFormatName = properties.get(WRITE_FORMAT);
+ } else {
+ fileFormatName =
properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
}
- if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
- return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
+ FileFormat fileFormat;
+ if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
+ fileFormat = FileFormat.ORC;
+ } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
+ fileFormat = FileFormat.PARQUET;
+ } else {
+ throw new RuntimeException("Unsupported input format type: " +
fileFormatName);
}
- return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+ return fileFormat;
}
+
public static String getFileCompress(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(COMPRESSION_CODEC)) {
@@ -605,11 +638,11 @@ public class IcebergUtils {
} else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
return properties.get(SPARK_SQL_COMPRESSION_CODEC);
}
- String fileFormat = getFileFormat(table);
- if (fileFormat.equalsIgnoreCase("parquet")) {
+ FileFormat fileFormat = getFileFormat(table);
+ if (fileFormat == FileFormat.PARQUET) {
return properties.getOrDefault(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
- } else if (fileFormat.equalsIgnoreCase("orc")) {
+ } else if (fileFormat == FileFormat.ORC) {
return properties.getOrDefault(
TableProperties.ORC_COMPRESSION,
TableProperties.ORC_COMPRESSION_DEFAULT);
}
@@ -620,9 +653,10 @@ public class IcebergUtils {
Map<String, String> properties = table.properties();
if
(properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
throw new NotSupportedException(
- "Table " + table.name() + " specifies " +
properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
- + " as a location provider. "
- + "Writing to Iceberg tables with custom location provider
is not supported.");
+ "Table " + table.name() + " specifies " + properties
+ .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
+ + " as a location provider. "
+ + "Writing to Iceberg tables with custom location
provider is not supported.");
}
String dataLocation =
properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
new file mode 100644
index 00000000000..4171a0536f9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.helper;
+
+import org.apache.doris.datasource.statistics.CommonStatistics;
+import org.apache.doris.thrift.TIcebergCommitData;
+
+import com.google.common.base.VerifyException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.WriteResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class IcebergWriterHelper {
+
+ private static final int DEFAULT_FILE_COUNT = 1;
+
+ public static WriteResult convertToWriterResult(
+ FileFormat format,
+ PartitionSpec spec,
+ List<TIcebergCommitData> commitDataList) {
+ List<DataFile> dataFiles = new ArrayList<>();
+ for (TIcebergCommitData commitData : commitDataList) {
+ //get the files path
+ String location = commitData.getFilePath();
+
+ //get the commit file statistics
+ long fileSize = commitData.getFileSize();
+ long recordCount = commitData.getRowCount();
+ CommonStatistics stat = new CommonStatistics(recordCount,
DEFAULT_FILE_COUNT, fileSize);
+
+ Optional<List<String>> partValues = Optional.empty();
+ //get and check partitionValues when table is partitionedTable
+ if (spec.isPartitioned()) {
+ List<String> partitionValues = commitData.getPartitionValues();
+ if (Objects.isNull(partitionValues) ||
partitionValues.isEmpty()) {
+ throw new VerifyException("No partition data for
partitioned table");
+ }
+ partitionValues = partitionValues.stream().map(s ->
s.equals("null") ? null : s)
+ .collect(Collectors.toList());
+ partValues = Optional.of(partitionValues);
+ }
+ DataFile dataFile = genDataFile(format, location, spec,
partValues, stat);
+ dataFiles.add(dataFile);
+ }
+ return WriteResult.builder()
+ .addDataFiles(dataFiles)
+ .build();
+
+ }
+
+ public static DataFile genDataFile(
+ FileFormat format,
+ String location,
+ PartitionSpec spec,
+ Optional<List<String>> partValues,
+ CommonStatistics statistics) {
+
+ DataFiles.Builder builder = DataFiles.builder(spec)
+ .withPath(location)
+ .withFileSizeInBytes(statistics.getTotalFileBytes())
+ .withRecordCount(statistics.getRowCount())
+ .withFormat(format);
+
+ partValues.ifPresent(builder::withPartitionValues);
+
+ return builder.build();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index e590e918344..56ff188f964 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource {
@Override
public String getFileFormat() {
- return IcebergUtils.getFileFormat(originTable);
+ return IcebergUtils.getFileFormat(originTable).name();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 06b785a15f8..5e9860171d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource {
private final org.apache.iceberg.Table icebergTable;
public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
+ Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
@@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public String getFileFormat() throws DdlException, MetaNotFoundException {
- return IcebergUtils.getFileFormat(icebergTable);
+ return IcebergUtils.getFileFormat(icebergTable).name();
}
public org.apache.iceberg.Table getIcebergTable() throws
MetaNotFoundException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
new file mode 100644
index 00000000000..9685dfdf35a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.statistics;
+
+public class CommonStatistics {
+
+ public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L,
0L);
+
+ private final long rowCount;
+ private final long fileCount;
+ private final long totalFileBytes;
+
+ public CommonStatistics(long rowCount, long fileCount, long
totalFileBytes) {
+ this.fileCount = fileCount;
+ this.rowCount = rowCount;
+ this.totalFileBytes = totalFileBytes;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public long getFileCount() {
+ return fileCount;
+ }
+
+ public long getTotalFileBytes() {
+ return totalFileBytes;
+ }
+
+ public static CommonStatistics reduce(
+ CommonStatistics current,
+ CommonStatistics update,
+ ReduceOperator operator) {
+ return new CommonStatistics(
+ reduce(current.getRowCount(), update.getRowCount(), operator),
+ reduce(current.getFileCount(), update.getFileCount(),
operator),
+ reduce(current.getTotalFileBytes(),
update.getTotalFileBytes(), operator));
+ }
+
+ public static long reduce(long current, long update, ReduceOperator
operator) {
+ if (current >= 0 && update >= 0) {
+ switch (operator) {
+ case ADD:
+ return current + update;
+ case SUBTRACT:
+ return current - update;
+ case MAX:
+ return Math.max(current, update);
+ case MIN:
+ return Math.min(current, update);
+ default:
+ throw new IllegalArgumentException("Unexpected operator: "
+ operator);
+ }
+ }
+
+ return 0;
+ }
+
+ public enum ReduceOperator {
+ ADD,
+ SUBTRACT,
+ MIN,
+ MAX,
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index b19c483c9f3..86b1f1ef0b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.nereids.NereidsPlanner;
@@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
* constructor
*/
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable
table,
- String labelName, NereidsPlanner planner,
- Optional<InsertCommandContext> insertCtx,
- boolean emptyInsert) {
+ String labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx,
+ boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
@@ -51,11 +52,23 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
}
+ @Override
+ protected void beforeExec() {
+ String dbName = ((IcebergExternalTable) table).getDbName();
+ String tbName = table.getName();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
+ IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
+ transaction.beginInsert(tableInfo);
+ }
+
@Override
protected void doBeforeCommit() throws UserException {
+ String dbName = ((IcebergExternalTable) table).getDbName();
+ String tbName = table.getName();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
- loadedRows = transaction.getUpdateCnt();
- transaction.finishInsert();
+ this.loadedRows = transaction.getUpdateCnt();
+ transaction.finishInsert(tableInfo, insertCtx);
}
@Override
@@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
return TransactionType.ICEBERG;
}
- @Override
- protected void beforeExec() {
- IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
- transaction.beginInsert(((IcebergExternalTable) table).getDbName(),
table.getName());
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 659be7cb1fe..0e01b599964 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -121,7 +121,7 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
}
// file info
-
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable)));
+
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
// hadoop config
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
index 4f4fe956d4b..f373c133685 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
@@ -17,6 +17,7 @@
package org.apache.doris.transaction;
+
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
@@ -58,12 +59,12 @@ public class IcebergTransactionManager implements
TransactionManager {
}
@Override
- public Transaction getTransaction(long id) {
+ public IcebergTransaction getTransaction(long id) {
return getTransactionWithException(id);
}
- public Transaction getTransactionWithException(long id) {
- Transaction icebergTransaction = transactions.get(id);
+ public IcebergTransaction getTransactionWithException(long id) {
+ IcebergTransaction icebergTransaction = transactions.get(id);
if (icebergTransaction == null) {
throw new RuntimeException("Can't find transaction for " + id);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index 10de5427902..4375dc5c025 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -18,15 +18,22 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.thrift.TFileContent;
import org.apache.doris.thrift.TIcebergCommitData;
+import com.google.common.collect.Maps;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
@@ -39,10 +46,11 @@ import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
@@ -50,23 +58,26 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class IcebergTransactionTest {
- public static String dbName = "db3";
- public static String tbWithPartition = "tbWithPartition";
- public static String tbWithoutPartition = "tbWithoutPartition";
- public static IcebergMetadataOps ops;
- public static Schema schema;
+ private static String dbName = "db3";
+ private static String tbWithPartition = "tbWithPartition";
+ private static String tbWithoutPartition = "tbWithoutPartition";
- @BeforeClass
- public static void beforeClass() throws IOException {
+ private IcebergExternalCatalog externalCatalog;
+ private IcebergMetadataOps ops;
+
+
+ @Before
+ public void init() throws IOException {
createCatalog();
createTable();
}
- public static void createCatalog() throws IOException {
+ private void createCatalog() throws IOException {
Path warehousePath = Files.createTempDirectory("test_warehouse_");
String warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
HadoopCatalog hadoopCatalog = new HadoopCatalog();
@@ -74,25 +85,32 @@ public class IcebergTransactionTest {
props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize("df", props);
- ops = new IcebergMetadataOps(null, hadoopCatalog);
+ this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg",
"", Maps.newHashMap(), "");
+ new MockUp<IcebergHMSExternalCatalog>() {
+ @Mock
+ public Catalog getCatalog() {
+ return hadoopCatalog;
+ }
+ };
+ ops = new IcebergMetadataOps(externalCatalog, hadoopCatalog);
}
- public static void createTable() throws IOException {
+ private void createTable() throws IOException {
HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog();
icebergCatalog.createNamespace(Namespace.of(dbName));
- schema = new Schema(
- Types.NestedField.required(11, "ts1",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(12, "ts2",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(13, "ts3",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(14, "ts4",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(15, "dt1", Types.DateType.get()),
- Types.NestedField.required(16, "dt2", Types.DateType.get()),
- Types.NestedField.required(17, "dt3", Types.DateType.get()),
- Types.NestedField.required(18, "dt4", Types.DateType.get()),
- Types.NestedField.required(19, "str1", Types.StringType.get()),
- Types.NestedField.required(20, "str2", Types.StringType.get()),
- Types.NestedField.required(21, "int1", Types.IntegerType.get()),
- Types.NestedField.required(22, "int2", Types.IntegerType.get())
+ Schema schema = new Schema(
+ Types.NestedField.required(11, "ts1",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(12, "ts2",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(13, "ts3",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(14, "ts4",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(15, "dt1", Types.DateType.get()),
+ Types.NestedField.required(16, "dt2", Types.DateType.get()),
+ Types.NestedField.required(17, "dt3", Types.DateType.get()),
+ Types.NestedField.required(18, "dt4", Types.DateType.get()),
+ Types.NestedField.required(19, "str1", Types.StringType.get()),
+ Types.NestedField.required(20, "str2", Types.StringType.get()),
+ Types.NestedField.required(21, "int1",
Types.IntegerType.get()),
+ Types.NestedField.required(22, "int2", Types.IntegerType.get())
);
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
@@ -112,7 +130,7 @@ public class IcebergTransactionTest {
icebergCatalog.createTable(TableIdentifier.of(dbName,
tbWithoutPartition), schema);
}
- public List<String> createPartitionValues() {
+ private List<String> createPartitionValues() {
Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
long ts = DateTimeUtil.microsFromInstant(instant);
@@ -165,14 +183,23 @@ public class IcebergTransactionTest {
ctdList.add(ctd1);
ctdList.add(ctd2);
+ Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithPartition));
+
+ new MockUp<IcebergUtils>() {
+ @Mock
+ public Table getRemoteTable(ExternalCatalog catalog,
SimpleTableInfo tableInfo) {
+ return table;
+ }
+ };
+
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(dbName, tbWithPartition);
- txn.finishInsert();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName,
tbWithPartition);
+ txn.beginInsert(tableInfo);
+ txn.finishInsert(tableInfo, Optional.empty());
txn.commit();
- Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithPartition));
- checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2",
"6");
+ checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2",
"6");
checkPushDownByPartitionForTs(table, "ts1");
checkPushDownByPartitionForTs(table, "ts2");
checkPushDownByPartitionForTs(table, "ts3");
@@ -189,7 +216,7 @@ public class IcebergTransactionTest {
checkPushDownByPartitionForBucketInt(table, "int1");
}
- public void checkPushDownByPartitionForBucketInt(Table table, String
column) {
+ private void checkPushDownByPartitionForBucketInt(Table table, String
column) {
// (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0
Integer i1 = 15;
@@ -212,12 +239,12 @@ public class IcebergTransactionTest {
checkPushDownByPartition(table, greaterThan2, 2);
}
- public void checkPushDownByPartitionForString(Table table, String column) {
+ private void checkPushDownByPartitionForString(Table table, String column)
{
// Since the string used to create the partition is in date format,
the date check can be reused directly
checkPushDownByPartitionForDt(table, column);
}
- public void checkPushDownByPartitionForTs(Table table, String column) {
+ private void checkPushDownByPartitionForTs(Table table, String column) {
String lessTs = "2023-12-11T12:34:56.123456";
String eqTs = "2024-12-11T12:34:56.123456";
String greaterTs = "2025-12-11T12:34:56.123456";
@@ -230,7 +257,7 @@ public class IcebergTransactionTest {
checkPushDownByPartition(table, greaterThan, 0);
}
- public void checkPushDownByPartitionForDt(Table table, String column) {
+ private void checkPushDownByPartitionForDt(Table table, String column) {
String less = "2023-12-11";
String eq = "2024-12-11";
String greater = "2025-12-11";
@@ -243,7 +270,7 @@ public class IcebergTransactionTest {
checkPushDownByPartition(table, greaterThan, 0);
}
- public void checkPushDownByPartition(Table table, Expression expr, Integer
expectFiles) {
+ private void checkPushDownByPartition(Table table, Expression expr,
Integer expectFiles) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().filter(expr).planFiles();
AtomicReference<Integer> cnt = new AtomicReference<>(0);
fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1));
@@ -268,45 +295,64 @@ public class IcebergTransactionTest {
ctdList.add(ctd1);
ctdList.add(ctd2);
+ Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithoutPartition));
+ new MockUp<IcebergUtils>() {
+ @Mock
+ public Table getRemoteTable(ExternalCatalog catalog,
SimpleTableInfo tableInfo) {
+ return table;
+ }
+ };
+
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(dbName, tbWithoutPartition);
- txn.finishInsert();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName,
tbWithPartition);
+ txn.beginInsert(tableInfo);
+ txn.finishInsert(tableInfo, Optional.empty());
txn.commit();
- Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithoutPartition));
checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2",
"6");
}
- public void checkSnapshotProperties(Map<String, String> props,
- String addRecords,
- String addFileCnt,
- String addFileSize) {
+ private IcebergTransaction getTxn() {
+ return new IcebergTransaction(ops);
+ }
+
+ private void checkSnapshotProperties(Map<String, String> props,
+ String addRecords,
+ String addFileCnt,
+ String addFileSize) {
Assert.assertEquals(addRecords, props.get("added-records"));
Assert.assertEquals(addFileCnt, props.get("added-data-files"));
Assert.assertEquals(addFileSize, props.get("added-files-size"));
}
- public String numToYear(Integer num) {
+ private String numToYear(Integer num) {
Transform<Object, Integer> year = Transforms.year();
return year.toHumanString(Types.IntegerType.get(), num);
}
- public String numToMonth(Integer num) {
+ private String numToMonth(Integer num) {
Transform<Object, Integer> month = Transforms.month();
return month.toHumanString(Types.IntegerType.get(), num);
}
- public String numToDay(Integer num) {
+ private String numToDay(Integer num) {
Transform<Object, Integer> day = Transforms.day();
return day.toHumanString(Types.IntegerType.get(), num);
}
- public String numToHour(Integer num) {
+ private String numToHour(Integer num) {
Transform<Object, Integer> hour = Transforms.hour();
return hour.toHumanString(Types.IntegerType.get(), num);
}
+ @Test
+ public void tableCloneTest() {
+ Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithoutPartition));
+ Table cloneTable = (Table) SerializationUtils.clone((Serializable)
table);
+ Assert.assertNotNull(cloneTable);
+ }
+
@Test
public void testTransform() {
Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
@@ -322,7 +368,4 @@ public class IcebergTransactionTest {
Assert.assertEquals("2024-12-11", numToDay(dt));
}
- public IcebergTransaction getTxn() {
- return new IcebergTransaction(ops);
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]