This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d8d9f0a0de4 [Feature-WIP](iceberg-writer) Implements iceberg partition
transform. (#36289)
d8d9f0a0de4 is described below
commit d8d9f0a0de486eb29b1f31f9b9fdb35bf54cc8af
Author: kang <[email protected]>
AuthorDate: Sat Jun 22 20:54:15 2024 +0800
[Feature-WIP](iceberg-writer) Implements iceberg partition transform.
(#36289)
#31442
Added iceberg operator function to support direct entry into the lake by
doris
1. Support insert into data to iceberg by appending hdfs files
2. Implement iceberg partition routing through partitionTransform
2.1) Serialize spec and schema data into json on the fe side and then
deserialize on the be side to get the schema and partition information
of iceberg table
2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other
types of partition strategies through partitionTransform and template
class
3. Transaction management through IcebergTransaction
3.1) After the be side file is written, report CommitData data to fe
according to the partition granularity
3.2) After receiving CommitData data, fe submits metadata to iceberg in
IcebergTransaction
### Future work
- Add unit test for partition transform function.
- Implement partition transform function with exchange sink turned on.
- The partition transform function omits the processing of bigint type.
---------
Co-authored-by: lik40 <[email protected]>
Co-authored-by: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
---
be/src/util/bit_util.h | 22 +
.../sink/writer/iceberg/partition_transformers.cpp | 168 ++-
.../sink/writer/iceberg/partition_transformers.h | 1275 +++++++++++++++++++-
.../sink/writer/iceberg/viceberg_table_writer.cpp | 22 +-
.../apache/doris/common/info/SimpleTableInfo.java | 66 +
.../datasource/iceberg/IcebergMetadataCache.java | 19 +-
.../datasource/iceberg/IcebergMetadataOps.java | 5 +
.../datasource/iceberg/IcebergTransaction.java | 211 ++--
.../doris/datasource/iceberg/IcebergUtils.java | 64 +-
.../iceberg/helper/IcebergWriterHelper.java | 91 ++
.../iceberg/source/IcebergApiSource.java | 2 +-
.../iceberg/source/IcebergHMSSource.java | 4 +-
.../datasource/statistics/CommonStatistics.java | 81 ++
.../commands/insert/IcebergInsertExecutor.java | 28 +-
.../org/apache/doris/planner/IcebergTableSink.java | 2 +-
.../transaction/IcebergTransactionManager.java | 7 +-
.../datasource/iceberg/IcebergTransactionTest.java | 139 ++-
17 files changed, 1979 insertions(+), 227 deletions(-)
diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index 230134ade09..6934f45ef3e 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -98,6 +98,28 @@ public:
return (v << n) >> n;
}
+ template <typename T>
+ static std::string IntToByteBuffer(T input) {
+ std::string buffer;
+ T value = input;
+ for (int i = 0; i < sizeof(value); ++i) {
+ // Applies a mask for a byte range on the input.
+ char value_to_save = value & 0XFF;
+ buffer.push_back(value_to_save);
+ // Remove the just processed part from the input so that we can
exit early if there
+ // is nothing left to process.
+ value >>= 8;
+ if (value == 0 && value_to_save >= 0) {
+ break;
+ }
+ if (value == -1 && value_to_save < 0) {
+ break;
+ }
+ }
+ std::reverse(buffer.begin(), buffer.end());
+ return buffer;
+ }
+
// Returns ceil(log2(x)).
// TODO: this could be faster if we use __builtin_clz. Fix this if this
ever shows up
// in a hot path.
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
index 0faebea6295..6b2a3305b9e 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
@@ -25,6 +25,9 @@
namespace doris {
namespace vectorized {
+const std::chrono::time_point<std::chrono::system_clock>
PartitionColumnTransformUtils::EPOCH =
+ std::chrono::system_clock::from_time_t(0);
+
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
const doris::iceberg::PartitionField& field, const TypeDescriptor&
source_type) {
auto& transform = field.transform();
@@ -33,23 +36,98 @@ std::unique_ptr<PartitionColumnTransform>
PartitionColumnTransforms::create(
if (std::regex_match(transform, width_match, hasWidth)) {
std::string name = width_match[1];
- //int parsed_width = std::stoi(width_match[2]);
+ int parsed_width = std::stoi(width_match[2]);
if (name == "truncate") {
switch (source_type.type) {
+ case TYPE_INT: {
+ return
std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_BIGINT: {
+ return
std::make_unique<BigintTruncatePartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ return
std::make_unique<StringTruncatePartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DECIMALV2: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL32: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL64: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL128I: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL256: {
+ return
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>(
+ source_type, parsed_width);
+ }
default: {
- throw doris::Exception(
- doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported type for truncate partition column
transform {}",
- source_type.debug_string());
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition
column transform {}",
+ source_type.debug_string(), transform);
}
}
} else if (name == "bucket") {
switch (source_type.type) {
+ case TYPE_INT: {
+ return
std::make_unique<IntBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_BIGINT: {
+ return
std::make_unique<BigintBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ return
std::make_unique<StringBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampBucketPartitionColumnTransform>(source_type,
+
parsed_width);
+ }
+ case TYPE_DECIMALV2: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL32: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL64: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL128I: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>(
+ source_type, parsed_width);
+ }
+ case TYPE_DECIMAL256: {
+ return
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>(
+ source_type, parsed_width);
+ }
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported type for bucket partition
column transform {}",
- source_type.debug_string());
+ "Unsupported type {} for partition
column transform {}",
+ source_type.debug_string(), transform);
}
}
}
@@ -57,14 +135,79 @@ std::unique_ptr<PartitionColumnTransform>
PartitionColumnTransforms::create(
if (transform == "identity") {
return std::make_unique<IdentityPartitionColumnTransform>(source_type);
+ } else if (transform == "year") {
+ switch (source_type.type) {
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateYearPartitionColumnTransform>(source_type);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampYearPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "month") {
+ switch (source_type.type) {
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateMonthPartitionColumnTransform>(source_type);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampMonthPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "day") {
+ switch (source_type.type) {
+ case TYPE_DATEV2: {
+ return
std::make_unique<DateDayPartitionColumnTransform>(source_type);
+ }
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampDayPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "hour") {
+ switch (source_type.type) {
+ case TYPE_DATETIMEV2: {
+ return
std::make_unique<TimestampHourPartitionColumnTransform>(source_type);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
+ }
+ }
+ } else if (transform == "void") {
+ return std::make_unique<VoidPartitionColumnTransform>(source_type);
} else {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported partition column transform: {}.",
transform);
+ "Unsupported type {} for partition column
transform {}",
+ source_type.debug_string(), transform);
}
}
+std::string PartitionColumnTransform::name() const {
+ return "default";
+}
+
std::string PartitionColumnTransform::to_human_string(const TypeDescriptor&
type,
const std::any& value)
const {
+ return get_partition_value(type, value);
+}
+
+std::string PartitionColumnTransform::get_partition_value(const
TypeDescriptor& type,
+ const std::any&
value) const {
if (value.has_value()) {
switch (type.type) {
case TYPE_BOOLEAN: {
@@ -131,19 +274,12 @@ std::string
PartitionColumnTransform::to_human_string(const TypeDescriptor& type
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported partition column transform:
{}",
- type.debug_string());
+ "Unsupported type {} for partition",
type.debug_string());
}
}
}
return "null";
}
-ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block,
int idx) {
- const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(idx);
- return {column_with_type_and_name.column, column_with_type_and_name.type,
- column_with_type_and_name.name};
-}
-
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h
b/be/src/vec/sink/writer/iceberg/partition_transformers.h
index bfa2c43d2e5..7db8bfb1886 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.h
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h
@@ -43,25 +43,81 @@ public:
const doris::iceberg::PartitionField& field, const TypeDescriptor&
source_type);
};
+class PartitionColumnTransformUtils {
+public:
+ static DateV2Value<DateV2ValueType>& epoch_date() {
+ static DateV2Value<DateV2ValueType> epoch_date;
+ static bool initialized = false;
+ if (!initialized) {
+ epoch_date.from_date_str("1970-01-01 00:00:00", 19);
+ initialized = true;
+ }
+ return epoch_date;
+ }
+
+ static DateV2Value<DateTimeV2ValueType>& epoch_datetime() {
+ static DateV2Value<DateTimeV2ValueType> epoch_datetime;
+ static bool initialized = false;
+ if (!initialized) {
+ epoch_datetime.from_date_str("1970-01-01 00:00:00", 19);
+ initialized = true;
+ }
+ return epoch_datetime;
+ }
+
+ static std::string human_year(int year_ordinal) {
+ auto year = std::chrono::year_month_day(
+
std::chrono::sys_days(std::chrono::floor<std::chrono::days>(
+ EPOCH + std::chrono::years(year_ordinal))))
+ .year();
+ return std::to_string(static_cast<int>(year));
+ }
+
+ static std::string human_month(int month_ordinal) {
+ auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+ std::chrono::floor<std::chrono::days>(EPOCH +
std::chrono::months(month_ordinal))));
+ return fmt::format("{:04d}-{:02d}", static_cast<int>(ymd.year()),
+ static_cast<unsigned>(ymd.month()));
+ }
+
+ static std::string human_day(int day_ordinal) {
+ auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+ std::chrono::floor<std::chrono::days>(EPOCH +
std::chrono::days(day_ordinal))));
+ return fmt::format("{:04d}-{:02d}-{:02d}",
static_cast<int>(ymd.year()),
+ static_cast<unsigned>(ymd.month()),
static_cast<unsigned>(ymd.day()));
+ }
+
+ static std::string human_hour(int hour_ordinal) {
+ int day_value = hour_ordinal / 24;
+ int housr_value = hour_ordinal % 24;
+ auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+ std::chrono::floor<std::chrono::days>(EPOCH +
std::chrono::days(day_value))));
+ return fmt::format("{:04d}-{:02d}-{:02d}-{:02d}",
static_cast<int>(ymd.year()),
+ static_cast<unsigned>(ymd.month()),
static_cast<unsigned>(ymd.day()),
+ housr_value);
+ }
+
+private:
+ static const std::chrono::time_point<std::chrono::system_clock> EPOCH;
+ PartitionColumnTransformUtils() = default;
+};
+
class PartitionColumnTransform {
public:
PartitionColumnTransform() = default;
virtual ~PartitionColumnTransform() = default;
- virtual bool preserves_non_null() const { return false; }
-
- virtual bool monotonic() const { return true; }
-
- virtual bool temporal() const { return false; }
+ virtual std::string name() const;
virtual const TypeDescriptor& get_result_type() const = 0;
- virtual bool is_void() const { return false; }
-
- virtual ColumnWithTypeAndName apply(Block& block, int idx) = 0;
+ virtual ColumnWithTypeAndName apply(Block& block, int column_pos) = 0;
virtual std::string to_human_string(const TypeDescriptor& type, const
std::any& value) const;
+
+ virtual std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const;
};
class IdentityPartitionColumnTransform : public PartitionColumnTransform {
@@ -69,12 +125,1211 @@ public:
IdentityPartitionColumnTransform(const TypeDescriptor& source_type)
: _source_type(source_type) {}
- virtual const TypeDescriptor& get_result_type() const { return
_source_type; }
+ std::string name() const override { return "Identity"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ return {column_with_type_and_name.column,
column_with_type_and_name.type,
+ column_with_type_and_name.name};
+ }
+
+private:
+ TypeDescriptor _source_type;
+};
+
+class StringTruncatePartitionColumnTransform : public PartitionColumnTransform
{
+public:
+ StringTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "StringTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ auto int_type = std::make_shared<DataTypeInt32>();
+ size_t num_columns_without_result = block.columns();
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+
+ ColumnPtr string_column_ptr;
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (auto* nullable_column =
+
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ string_column_ptr = nullable_column->get_nested_column_ptr();
+ is_nullable = true;
+ } else {
+ string_column_ptr = column_with_type_and_name.column;
+ is_nullable = false;
+ }
+ block.replace_by_position(column_pos, std::move(string_column_ptr));
+ block.insert(
+ {int_type->create_column_const(block.rows(), to_field(1)),
int_type, "const 1"});
+ block.insert({int_type->create_column_const(block.rows(),
to_field(_width)), int_type,
+ fmt::format("const {}", _width)});
+ block.insert({nullptr, std::make_shared<DataTypeString>(), "result"});
+ ColumnNumbers temp_arguments(3);
+ temp_arguments[0] = column_pos; // str column
+ temp_arguments[1] = num_columns_without_result; // pos
+ temp_arguments[2] = num_columns_without_result + 1; // width
+ size_t result_column_id = num_columns_without_result + 2;
+
+ SubstringUtil::substring_execute(block, temp_arguments,
result_column_id, block.rows());
+ if (is_nullable) {
+ auto res_column =
ColumnNullable::create(block.get_by_position(result_column_id).column,
+ null_map_column_ptr);
+ Block::erase_useless_column(&block, num_columns_without_result);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ auto res_column = block.get_by_position(result_column_id).column;
+ Block::erase_useless_column(&block, num_columns_without_result);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+class IntegerTruncatePartitionColumnTransform : public
PartitionColumnTransform {
+public:
+ IntegerTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "IntegerTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt32*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const int* end_in = in_data.data() + in_data.size();
+ const Int32* __restrict p_in = in_data.data();
+ Int32* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {res_column,
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+class BigintTruncatePartitionColumnTransform : public PartitionColumnTransform
{
+public:
+ BigintTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "BigintTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt64*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt64::create();
+ ColumnInt64::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const Int64* end_in = in_data.data() + in_data.size();
+ const Int64* __restrict p_in = in_data.data();
+ Int64* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {res_column,
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+template <typename T>
+class DecimalTruncatePartitionColumnTransform : public
PartitionColumnTransform {
+public:
+ DecimalTruncatePartitionColumnTransform(const TypeDescriptor& source_type,
int width)
+ : _source_type(source_type), _width(width) {}
+
+ std::string name() const override { return "DecimalTruncate"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_source_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+
+ ColumnPtr column_ptr;
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (auto* nullable_column =
+
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ is_nullable = true;
+ } else {
+ column_ptr = column_with_type_and_name.column;
+ is_nullable = false;
+ }
+
+ const auto* const decimal_col =
check_and_get_column<ColumnDecimal<T>>(column_ptr);
+ const auto& vec_src = decimal_col->get_data();
+
+ auto col_res = ColumnDecimal<T>::create(vec_src.size(),
decimal_col->get_scale());
+ auto& vec_res = col_res->get_data();
+
+ const typename T::NativeType* __restrict p_in =
+ reinterpret_cast<const T::NativeType*>(vec_src.data());
+ const typename T::NativeType* end_in =
+ reinterpret_cast<const T::NativeType*>(vec_src.data()) +
vec_src.size();
+ typename T::NativeType* __restrict p_out =
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+ while (p_in < end_in) {
+ typename T::NativeType remainder = ((*p_in % _width) + _width) %
_width;
+ *p_out = *p_in - remainder;
+ ++p_in;
+ ++p_out;
+ }
+
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {res_column,
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _width;
+};
+
+class IntBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ IntBucketPartitionColumnTransform(const TypeDescriptor& source_type, int
bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "IntBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt32*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const int* end_in = in_data.data() + in_data.size();
+ const Int32* __restrict p_in = in_data.data();
+ Int32* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ Int64 long_value = static_cast<Int64>(*p_in);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+ *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class BigintBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ BigintBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "BigintBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnInt64*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt64::create();
+ ColumnInt64::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const Int64* end_in = in_data.data() + in_data.size();
+ const Int64* __restrict p_in = in_data.data();
+ Int64* __restrict p_out = out_data.data();
+ while (p_in < end_in) {
+ Int64 long_value = static_cast<Int64>(*p_in);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+ int value = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+ *p_out = value;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+template <typename T>
+class DecimalBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DecimalBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DecimalBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDecimal<T>*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ auto& vec_res = col_res->get_data();
+
+ const typename T::NativeType* __restrict p_in =
+ reinterpret_cast<const T::NativeType*>(in_data.data());
+ const typename T::NativeType* end_in =
+ reinterpret_cast<const T::NativeType*>(in_data.data()) +
in_data.size();
+ typename T::NativeType* __restrict p_out =
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+ while (p_in < end_in) {
+ std::string buffer = BitUtil::IntToByteBuffer(*p_in);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(buffer.data(),
buffer.size(), 0);
+ *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ return get_partition_value(type, value);
+ }
+
+ std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const override {
+ if (value.has_value()) {
+ return std::to_string(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class DateBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateBucketPartitionColumnTransform(const TypeDescriptor& source_type, int
bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const auto* end_in = in_data.data() + in_data.size();
+
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+
+ int32_t days_from_unix_epoch = value.daynr() - 719528;
+ Int64 long_value = static_cast<Int64>(days_from_unix_epoch);
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+
+ *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class TimestampBucketPartitionColumnTransform : public
PartitionColumnTransform {
+public:
+ TimestampBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+ const auto* end_in = in_data.data() + in_data.size();
+
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+
+ int64_t timestamp;
+ if (!value.unix_timestamp(×tamp, "UTC")) {
+ LOG(WARNING) << "Failed to call unix_timestamp :" <<
value.debug_string();
+ timestamp = 0;
+ }
+ Int64 long_value = static_cast<Int64>(timestamp) * 1000000;
+ uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return std::to_string(std::any_cast<Int32>(value));
+ ;
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class StringBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ StringBucketPartitionColumnTransform(const TypeDescriptor& source_type,
int bucket_num)
+ : _source_type(source_type), _bucket_num(bucket_num),
_target_type(TYPE_INT) {}
+
+ std::string name() const override { return "StringBucket"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto* str_col = assert_cast<const
ColumnString*>(column_ptr.get());
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ const auto& data = str_col->get_chars();
+ const auto& offsets = str_col->get_offsets();
+
+ size_t offset_size = offsets.size();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(offset_size);
+ auto* __restrict p_out = out_data.data();
+
+ for (int i = 0; i < offset_size; i++) {
+ const unsigned char* raw_str = &data[offsets[i - 1]];
+ ColumnString::Offset size = offsets[i] - offsets[i - 1];
+ uint32_t hash_value = HashUtil::murmur_hash3_32(raw_str, size, 0);
+
+ *p_out = (hash_value & INT32_MAX) % _bucket_num;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ int _bucket_num;
+ TypeDescriptor _target_type;
+};
+
+class DateYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateYearPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateYear"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+ *p_out =
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ TimestampYearPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampYear"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class DateMonthPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateMonth"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+ *p_out =
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform
{
+public:
+ TimestampMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampMonth"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class DateDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ DateDayPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "DateDay"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+ *p_out =
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ return get_partition_value(type, value);
+ }
+
+ std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const override {
+ if (value.has_value()) {
+ int day_value = std::any_cast<Int32>(value);
+ return PartitionColumnTransformUtils::human_day(day_value);
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ TimestampDayPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampDay"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ return get_partition_value(type, value);
+ }
+
+ std::string get_partition_value(const TypeDescriptor& type,
+ const std::any& value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_day(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class TimestampHourPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ TimestampHourPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(TYPE_INT) {}
+
+ std::string name() const override { return "TimestampHour"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ //1) get the target column ptr
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
+ ColumnPtr column_ptr =
column_with_type_and_name.column->convert_to_full_column_if_const();
+ CHECK(column_ptr != nullptr);
+
+ //2) get the input data from block
+ ColumnPtr null_map_column_ptr;
+ bool is_nullable = false;
+ if (column_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(column_ptr.get());
+ is_nullable = true;
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ }
+ const auto& in_data = assert_cast<const
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+ //3) do partition routing
+ auto col_res = ColumnInt32::create();
+ ColumnInt32::Container& out_data = col_res->get_data();
+ out_data.resize(in_data.size());
+
+ const auto* end_in = in_data.data() + in_data.size();
+ const auto* __restrict p_in = in_data.data();
+ auto* __restrict p_out = out_data.data();
+
+ while (p_in < end_in) {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+ *p_out =
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ ++p_in;
+ ++p_out;
+ }
+
+ //4) create the partition column and return
+ if (is_nullable) {
+ auto res_column = ColumnNullable::create(std::move(col_res),
null_map_column_ptr);
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ } else {
+ return {std::move(col_res),
+
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+ column_with_type_and_name.name};
+ }
+ }
+
+ std::string to_human_string(const TypeDescriptor& type, const std::any&
value) const override {
+ if (value.has_value()) {
+ return
PartitionColumnTransformUtils::human_hour(std::any_cast<Int32>(value));
+ } else {
+ return "null";
+ }
+ }
+
+private:
+ TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
+};
+
+class VoidPartitionColumnTransform : public PartitionColumnTransform {
+public:
+ VoidPartitionColumnTransform(const TypeDescriptor& source_type)
+ : _source_type(source_type), _target_type(source_type) {}
+
+ std::string name() const override { return "Void"; }
+
+ const TypeDescriptor& get_result_type() const override { return
_target_type; }
+
+ ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+ const ColumnWithTypeAndName& column_with_type_and_name =
block.get_by_position(column_pos);
- virtual ColumnWithTypeAndName apply(Block& block, int idx);
+ ColumnPtr column_ptr;
+ ColumnPtr null_map_column_ptr;
+ if (auto* nullable_column =
+
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+ null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+ column_ptr = nullable_column->get_nested_column_ptr();
+ } else {
+ column_ptr = column_with_type_and_name.column;
+ }
+ auto res_column = ColumnNullable::create(std::move(column_ptr),
+
ColumnUInt8::create(column_ptr->size(), 1));
+ return {std::move(res_column),
+
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+ column_with_type_and_name.name};
+ }
private:
TypeDescriptor _source_type;
+ TypeDescriptor _target_type;
};
} // namespace vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 2703330406c..e59b0593f7b 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -329,8 +329,8 @@ std::vector<std::string>
VIcebergTableWriter::_partition_values(
TypeDescriptor result_type =
iceberg_partition_column.partition_column_transform().get_result_type();
partition_values.emplace_back(
-
iceberg_partition_column.partition_column_transform().to_human_string(result_type,
-
data.get(i)));
+
iceberg_partition_column.partition_column_transform().get_partition_value(
+ result_type, data.get(i)));
}
return partition_values;
@@ -407,21 +407,25 @@ std::optional<PartitionData>
VIcebergTableWriter::_get_partition_data(
std::any VIcebergTableWriter::_get_iceberg_partition_value(
const TypeDescriptor& type_desc, const ColumnWithTypeAndName&
partition_column,
int position) {
- ColumnPtr column;
- if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*partition_column.column)) {
+ //1) get the partition column ptr
+ ColumnPtr col_ptr =
partition_column.column->convert_to_full_column_if_const();
+ CHECK(col_ptr != nullptr);
+ if (col_ptr->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ reinterpret_cast<const
vectorized::ColumnNullable*>(col_ptr.get());
auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
if (null_map_data[position]) {
return std::any();
}
- column = nullable_column->get_nested_column_ptr();
- } else {
- column = partition_column.column;
+ col_ptr = nullable_column->get_nested_column_ptr();
}
- auto [item, size] = column->get_data_at(position);
+
+ //2) get parition field data from paritionblock
+ auto [item, size] = col_ptr->get_data_at(position);
switch (type_desc.type) {
case TYPE_BOOLEAN: {
vectorized::Field field =
- vectorized::check_and_get_column<const
ColumnUInt8>(*column)->operator[](position);
+ vectorized::check_and_get_column<const
ColumnUInt8>(*col_ptr)->operator[](position);
return field.get<bool>();
}
case TYPE_TINYINT: {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
new file mode 100644
index 00000000000..6fdb27e1d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java
+// and modified by Doris
+
+package org.apache.doris.common.info;
+
+import java.util.Objects;
+
+public class SimpleTableInfo {
+
+ private final String dbName;
+ private final String tbName;
+
+ public SimpleTableInfo(String dbName, String tbName) {
+ this.dbName = dbName;
+ this.tbName = tbName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTbName() {
+ return tbName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tbName);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ SimpleTableInfo that = (SimpleTableInfo) other;
+ return Objects.equals(dbName, that.dbName) && Objects.equals(tbName,
that.tbName);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s.%s", dbName, tbName);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index acda08b7378..68064c4e439 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
@@ -85,6 +86,20 @@ public class IcebergMetadataCache {
return tableCache.get(key);
}
+ public Table getAndCloneTable(CatalogIf catalog, String dbName, String
tbName) {
+ Table restTable;
+ synchronized (this) {
+ Table table = getIcebergTable(catalog, dbName, tbName);
+ restTable = SerializableTable.copyOf(table);
+ }
+ return restTable;
+ }
+
+ public Table getRemoteTable(CatalogIf catalog, String dbName, String
tbName) {
+ IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog,
dbName, tbName);
+ return loadTable(key);
+ }
+
@NotNull
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
Table icebergTable = getIcebergTable(key.catalog, key.dbName,
key.tableName);
@@ -116,7 +131,7 @@ public class IcebergMetadataCache {
public void invalidateCatalogCache(long catalogId) {
snapshotListCache.asMap().keySet().stream()
.filter(key -> key.catalog.getId() == catalogId)
- .forEach(snapshotListCache::invalidate);
+ .forEach(snapshotListCache::invalidate);
tableCache.asMap().entrySet().stream()
.filter(entry -> entry.getKey().catalog.getId() == catalogId)
@@ -130,7 +145,7 @@ public class IcebergMetadataCache {
snapshotListCache.asMap().keySet().stream()
.filter(key -> key.catalog.getId() == catalogId &&
key.dbName.equals(dbName) && key.tableName.equals(
tblName))
- .forEach(snapshotListCache::invalidate);
+ .forEach(snapshotListCache::invalidate);
tableCache.asMap().entrySet().stream()
.filter(entry -> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 7161f48680a..7bd8409ca2c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -64,6 +64,10 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
return catalog;
}
+ public IcebergExternalCatalog getExternalCatalog() {
+ return dorisCatalog;
+ }
+
@Override
public void close() {
}
@@ -173,4 +177,5 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
catalog.dropTable(TableIdentifier.of(dbName, tableName));
db.setUnInitialized(true);
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 5025e075142..a3a978ccd7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -21,31 +21,39 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TFileContent;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
+import
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TIcebergCommitData;
+import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;
-import com.google.common.base.VerifyException;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.Metrics;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.WriteResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
public class IcebergTransaction implements Transaction {
private static final Logger LOG =
LogManager.getLogger(IcebergTransaction.class);
+
private final IcebergMetadataOps ops;
+ private SimpleTableInfo tableInfo;
+ private Table table;
+
+
private org.apache.iceberg.Transaction transaction;
private final List<TIcebergCommitData> commitDataList =
Lists.newArrayList();
@@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction {
}
}
- public void beginInsert(String dbName, String tbName) {
- Table icebergTable =
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
- transaction = icebergTable.newTransaction();
+ public void beginInsert(SimpleTableInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ this.table = getNativeTable(tableInfo);
+ this.transaction = table.newTransaction();
}
- public void finishInsert() {
- Table icebergTable = transaction.table();
- AppendFiles appendFiles = transaction.newAppend();
-
- for (CommitTaskData task : convertToCommitTaskData()) {
- DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
- .withPath(task.getPath())
- .withFileSizeInBytes(task.getFileSizeInBytes())
- .withFormat(IcebergUtils.getFileFormat(icebergTable))
- .withMetrics(task.getMetrics());
-
- if (icebergTable.spec().isPartitioned()) {
- List<String> partitionValues = task.getPartitionValues()
- .orElseThrow(() -> new VerifyException("No partition
data for partitioned table"));
- builder.withPartitionValues(partitionValues);
- }
- appendFiles.appendFile(builder.build());
+ public void finishInsert(SimpleTableInfo tableInfo,
Optional<InsertCommandContext> insertCtx) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("iceberg table {} insert table finished!", tableInfo);
}
- // in appendFiles.commit, it will generate metadata(manifest and
snapshot)
- // after appendFiles.commit, in current transaction, you can already
see the new snapshot
- appendFiles.commit();
- }
-
- public List<CommitTaskData> convertToCommitTaskData() {
- List<CommitTaskData> commitTaskData = new ArrayList<>();
- for (TIcebergCommitData data : this.commitDataList) {
- commitTaskData.add(new CommitTaskData(
- data.getFilePath(),
- data.getFileSize(),
- new Metrics(
- data.getRowCount(),
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP,
- Collections.EMPTY_MAP
- ),
- data.isSetPartitionValues() ?
Optional.of(data.getPartitionValues()) : Optional.empty(),
- convertToFileContent(data.getFileContent()),
- data.isSetReferencedDataFiles() ?
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
- ));
+ //create and start the iceberg transaction
+ TUpdateMode updateMode = TUpdateMode.APPEND;
+ if (insertCtx.isPresent()) {
+ updateMode = ((BaseExternalTableInsertCommandContext)
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
+ : TUpdateMode.APPEND;
}
- return commitTaskData;
+ updateManifestAfterInsert(updateMode);
}
- private FileContent convertToFileContent(TFileContent content) {
- if (content.equals(TFileContent.DATA)) {
- return FileContent.DATA;
- } else if (content.equals(TFileContent.POSITION_DELETES)) {
- return FileContent.POSITION_DELETES;
+ private void updateManifestAfterInsert(TUpdateMode updateMode) {
+ PartitionSpec spec = table.spec();
+ FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
+ //convert commitDataList to writeResult
+ WriteResult writeResult = IcebergWriterHelper
+ .convertToWriterResult(fileFormat, spec, commitDataList);
+ List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
+
+ if (spec.isPartitioned()) {
+ partitionManifestUpdate(updateMode, table, pendingResults);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("{} {} table partition manifest successful and
writeResult : {}..", tableInfo, updateMode,
+ writeResult);
+ }
} else {
- return FileContent.EQUALITY_DELETES;
+ tableManifestUpdate(updateMode, table, pendingResults);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("{} {} table manifest successful and writeResult :
{}..", tableInfo, updateMode,
+ writeResult);
+ }
}
}
@Override
public void commit() throws UserException {
- // Externally readable
- // Manipulate the relevant data so that others can also see the latest
table, such as:
- // 1. hadoop: it will change the version number information in
'version-hint.text'
- // 2. hive: it will change the table properties, the most important
thing is to revise 'metadata_location'
- // 3. and so on ...
+ // commit the iceberg transaction
transaction.commitTransaction();
}
@Override
public void rollback() {
-
+ //do nothing
}
public long getUpdateCnt() {
return
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
}
- public static class CommitTaskData {
- private final String path;
- private final long fileSizeInBytes;
- private final Metrics metrics;
- private final Optional<List<String>> partitionValues;
- private final FileContent content;
- private final Optional<List<String>> referencedDataFiles;
-
- public CommitTaskData(String path,
- long fileSizeInBytes,
- Metrics metrics,
- Optional<List<String>> partitionValues,
- FileContent content,
- Optional<List<String>> referencedDataFiles) {
- this.path = path;
- this.fileSizeInBytes = fileSizeInBytes;
- this.metrics = metrics;
- this.partitionValues =
convertPartitionValuesForNull(partitionValues);
- this.content = content;
- this.referencedDataFiles = referencedDataFiles;
- }
- private Optional<List<String>>
convertPartitionValuesForNull(Optional<List<String>> partitionValues) {
- if (!partitionValues.isPresent()) {
- return partitionValues;
- }
- List<String> values = partitionValues.get();
- if (!values.contains("null")) {
- return partitionValues;
- }
- return Optional.of(values.stream().map(s -> s.equals("null") ?
null : s).collect(Collectors.toList()));
- }
+ private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
+ Objects.requireNonNull(tableInfo);
+ IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+ return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
+ }
- public String getPath() {
- return path;
+ private void partitionManifestUpdate(TUpdateMode updateMode, Table table,
List<WriteResult> pendingResults) {
+ if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+ LOG.warn("{} partitionManifestUp method call but pendingResults is
null or empty!", table.name());
+ return;
}
-
- public long getFileSizeInBytes() {
- return fileSizeInBytes;
+ // Commit the appendPartitionOperator transaction.
+ if (updateMode == TUpdateMode.APPEND) {
+ commitAppendTxn(table, pendingResults);
+ } else {
+ ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length
== 0,
+ "Should have no referenced data files.");
+
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+ }
+ appendPartitionOp.commit();
}
+ }
- public Metrics getMetrics() {
- return metrics;
+ private void tableManifestUpdate(TUpdateMode updateMode, Table table,
List<WriteResult> pendingResults) {
+ if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+ LOG.warn("{} tableManifestUp method call but pendingResults is
null or empty!", table.name());
+ return;
}
-
- public Optional<List<String>> getPartitionValues() {
- return partitionValues;
+ // Commit the appendPartitionOperator transaction.
+ if (LOG.isDebugEnabled()) {
+ LOG.info("{} tableManifestUp method call ", table.name());
}
-
- public FileContent getContent() {
- return content;
+ if (updateMode == TUpdateMode.APPEND) {
+ commitAppendTxn(table, pendingResults);
+ } else {
+ ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length
== 0,
+ "Should have no referenced data files.");
+
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+ }
+ appendPartitionOp.commit();
}
+ }
- public Optional<List<String>> getReferencedDataFiles() {
- return referencedDataFiles;
+
+ private void commitAppendTxn(Table table, List<WriteResult>
pendingResults) {
+ // To be compatible with iceberg format V1.
+ AppendFiles appendFiles = table.newAppend();
+ for (WriteResult result : pendingResults) {
+ Preconditions.checkState(result.referencedDataFiles().length == 0,
+ "Should have no referenced data files for append.");
+ Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
+ appendFiles.commit();
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 2aa5dda35a4..512e6a3ee93 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
@@ -50,6 +51,7 @@ import
org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.collect.Lists;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -87,6 +89,8 @@ public class IcebergUtils {
// https://iceberg.apache.org/spec/#schemas-and-data-types
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
+ private static final String PARQUET_NAME = "parquet";
+ private static final String ORC_NAME = "orc";
public static final String TOTAL_RECORDS = "total-records";
public static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
@@ -522,8 +526,8 @@ public class IcebergUtils {
case MAP:
Types.MapType map = (Types.MapType) type;
return new MapType(
- icebergTypeToDorisType(map.keyType()),
- icebergTypeToDorisType(map.valueType())
+ icebergTypeToDorisType(map.keyType()),
+ icebergTypeToDorisType(map.valueType())
);
case STRUCT:
Types.StructType struct = (Types.StructType) type;
@@ -536,11 +540,30 @@ public class IcebergUtils {
}
}
+
public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog
catalog, String dbName, String tblName) {
+ return getIcebergTableInternal(catalog, dbName, tblName, false);
+ }
+
+ public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog
catalog, SimpleTableInfo tableInfo) {
+ return getIcebergTableInternal(catalog, tableInfo.getDbName(),
tableInfo.getTbName(), true);
+ }
+
+ public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog
catalog, SimpleTableInfo tableInfo) {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
- .getIcebergTable(catalog, dbName, tblName);
+ .getRemoteTable(catalog, tableInfo.getDbName(),
tableInfo.getTbName());
+ }
+
+ private static org.apache.iceberg.Table
getIcebergTableInternal(ExternalCatalog catalog, String dbName,
+ String tblName,
+ boolean isClone) {
+ IcebergMetadataCache metadataCache = Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergMetadataCache();
+ return isClone ? metadataCache.getAndCloneTable(catalog, dbName,
tblName)
+ : metadataCache.getIcebergTable(catalog, dbName, tblName);
}
/**
@@ -587,17 +610,27 @@ public class IcebergUtils {
return -1;
}
- public static String getFileFormat(Table table) {
- Map<String, String> properties = table.properties();
+
+ public static FileFormat getFileFormat(Table icebergTable) {
+ Map<String, String> properties = icebergTable.properties();
+ String fileFormatName;
if (properties.containsKey(WRITE_FORMAT)) {
- return properties.get(WRITE_FORMAT);
+ fileFormatName = properties.get(WRITE_FORMAT);
+ } else {
+ fileFormatName =
properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
}
- if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
- return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
+ FileFormat fileFormat;
+ if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
+ fileFormat = FileFormat.ORC;
+ } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
+ fileFormat = FileFormat.PARQUET;
+ } else {
+ throw new RuntimeException("Unsupported input format type: " +
fileFormatName);
}
- return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+ return fileFormat;
}
+
public static String getFileCompress(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(COMPRESSION_CODEC)) {
@@ -605,11 +638,11 @@ public class IcebergUtils {
} else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
return properties.get(SPARK_SQL_COMPRESSION_CODEC);
}
- String fileFormat = getFileFormat(table);
- if (fileFormat.equalsIgnoreCase("parquet")) {
+ FileFormat fileFormat = getFileFormat(table);
+ if (fileFormat == FileFormat.PARQUET) {
return properties.getOrDefault(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
- } else if (fileFormat.equalsIgnoreCase("orc")) {
+ } else if (fileFormat == FileFormat.ORC) {
return properties.getOrDefault(
TableProperties.ORC_COMPRESSION,
TableProperties.ORC_COMPRESSION_DEFAULT);
}
@@ -620,9 +653,10 @@ public class IcebergUtils {
Map<String, String> properties = table.properties();
if
(properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
throw new NotSupportedException(
- "Table " + table.name() + " specifies " +
properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
- + " as a location provider. "
- + "Writing to Iceberg tables with custom location provider
is not supported.");
+ "Table " + table.name() + " specifies " + properties
+ .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
+ + " as a location provider. "
+ + "Writing to Iceberg tables with custom location
provider is not supported.");
}
String dataLocation =
properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
new file mode 100644
index 00000000000..4171a0536f9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.helper;
+
+import org.apache.doris.datasource.statistics.CommonStatistics;
+import org.apache.doris.thrift.TIcebergCommitData;
+
+import com.google.common.base.VerifyException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.WriteResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class IcebergWriterHelper {
+
+ private static final int DEFAULT_FILE_COUNT = 1;
+
+ public static WriteResult convertToWriterResult(
+ FileFormat format,
+ PartitionSpec spec,
+ List<TIcebergCommitData> commitDataList) {
+ List<DataFile> dataFiles = new ArrayList<>();
+ for (TIcebergCommitData commitData : commitDataList) {
+ //get the files path
+ String location = commitData.getFilePath();
+
+ //get the commit file statistics
+ long fileSize = commitData.getFileSize();
+ long recordCount = commitData.getRowCount();
+ CommonStatistics stat = new CommonStatistics(recordCount,
DEFAULT_FILE_COUNT, fileSize);
+
+ Optional<List<String>> partValues = Optional.empty();
+ //get and check partitionValues when table is partitionedTable
+ if (spec.isPartitioned()) {
+ List<String> partitionValues = commitData.getPartitionValues();
+ if (Objects.isNull(partitionValues) ||
partitionValues.isEmpty()) {
+ throw new VerifyException("No partition data for
partitioned table");
+ }
+ partitionValues = partitionValues.stream().map(s ->
s.equals("null") ? null : s)
+ .collect(Collectors.toList());
+ partValues = Optional.of(partitionValues);
+ }
+ DataFile dataFile = genDataFile(format, location, spec,
partValues, stat);
+ dataFiles.add(dataFile);
+ }
+ return WriteResult.builder()
+ .addDataFiles(dataFiles)
+ .build();
+
+ }
+
+ public static DataFile genDataFile(
+ FileFormat format,
+ String location,
+ PartitionSpec spec,
+ Optional<List<String>> partValues,
+ CommonStatistics statistics) {
+
+ DataFiles.Builder builder = DataFiles.builder(spec)
+ .withPath(location)
+ .withFileSizeInBytes(statistics.getTotalFileBytes())
+ .withRecordCount(statistics.getRowCount())
+ .withFormat(format);
+
+ partValues.ifPresent(builder::withPartitionValues);
+
+ return builder.build();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index e590e918344..56ff188f964 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource {
@Override
public String getFileFormat() {
- return IcebergUtils.getFileFormat(originTable);
+ return IcebergUtils.getFileFormat(originTable).name();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 06b785a15f8..5e9860171d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource {
private final org.apache.iceberg.Table icebergTable;
public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
+ Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
@@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public String getFileFormat() throws DdlException, MetaNotFoundException {
- return IcebergUtils.getFileFormat(icebergTable);
+ return IcebergUtils.getFileFormat(icebergTable).name();
}
public org.apache.iceberg.Table getIcebergTable() throws
MetaNotFoundException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
new file mode 100644
index 00000000000..9685dfdf35a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.statistics;
+
+public class CommonStatistics {
+
+ public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L,
0L);
+
+ private final long rowCount;
+ private final long fileCount;
+ private final long totalFileBytes;
+
+ public CommonStatistics(long rowCount, long fileCount, long
totalFileBytes) {
+ this.fileCount = fileCount;
+ this.rowCount = rowCount;
+ this.totalFileBytes = totalFileBytes;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public long getFileCount() {
+ return fileCount;
+ }
+
+ public long getTotalFileBytes() {
+ return totalFileBytes;
+ }
+
+ public static CommonStatistics reduce(
+ CommonStatistics current,
+ CommonStatistics update,
+ ReduceOperator operator) {
+ return new CommonStatistics(
+ reduce(current.getRowCount(), update.getRowCount(), operator),
+ reduce(current.getFileCount(), update.getFileCount(),
operator),
+ reduce(current.getTotalFileBytes(),
update.getTotalFileBytes(), operator));
+ }
+
+ public static long reduce(long current, long update, ReduceOperator
operator) {
+ if (current >= 0 && update >= 0) {
+ switch (operator) {
+ case ADD:
+ return current + update;
+ case SUBTRACT:
+ return current - update;
+ case MAX:
+ return Math.max(current, update);
+ case MIN:
+ return Math.min(current, update);
+ default:
+ throw new IllegalArgumentException("Unexpected operator: "
+ operator);
+ }
+ }
+
+ return 0;
+ }
+
+ public enum ReduceOperator {
+ ADD,
+ SUBTRACT,
+ MIN,
+ MAX,
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index b19c483c9f3..86b1f1ef0b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.nereids.NereidsPlanner;
@@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
* constructor
*/
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable
table,
- String labelName, NereidsPlanner planner,
- Optional<InsertCommandContext> insertCtx,
- boolean emptyInsert) {
+ String labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx,
+ boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
@@ -51,11 +52,23 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
}
+ @Override
+ protected void beforeExec() {
+ String dbName = ((IcebergExternalTable) table).getDbName();
+ String tbName = table.getName();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
+ IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
+ transaction.beginInsert(tableInfo);
+ }
+
@Override
protected void doBeforeCommit() throws UserException {
+ String dbName = ((IcebergExternalTable) table).getDbName();
+ String tbName = table.getName();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
- loadedRows = transaction.getUpdateCnt();
- transaction.finishInsert();
+ this.loadedRows = transaction.getUpdateCnt();
+ transaction.finishInsert(tableInfo, insertCtx);
}
@Override
@@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
return TransactionType.ICEBERG;
}
- @Override
- protected void beforeExec() {
- IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
- transaction.beginInsert(((IcebergExternalTable) table).getDbName(),
table.getName());
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 659be7cb1fe..0e01b599964 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -121,7 +121,7 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
}
// file info
-
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable)));
+
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
// hadoop config
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
index 3d6486f9391..f4b802aaa99 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
@@ -17,6 +17,7 @@
package org.apache.doris.transaction;
+
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
@@ -55,12 +56,12 @@ public class IcebergTransactionManager implements
TransactionManager {
}
@Override
- public Transaction getTransaction(long id) {
+ public IcebergTransaction getTransaction(long id) {
return getTransactionWithException(id);
}
- public Transaction getTransactionWithException(long id) {
- Transaction icebergTransaction = transactions.get(id);
+ public IcebergTransaction getTransactionWithException(long id) {
+ IcebergTransaction icebergTransaction = transactions.get(id);
if (icebergTransaction == null) {
throw new RuntimeException("Can't find transaction for " + id);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index 10de5427902..4375dc5c025 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -18,15 +18,22 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.thrift.TFileContent;
import org.apache.doris.thrift.TIcebergCommitData;
+import com.google.common.collect.Maps;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
@@ -39,10 +46,11 @@ import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
@@ -50,23 +58,26 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class IcebergTransactionTest {
- public static String dbName = "db3";
- public static String tbWithPartition = "tbWithPartition";
- public static String tbWithoutPartition = "tbWithoutPartition";
- public static IcebergMetadataOps ops;
- public static Schema schema;
+ private static String dbName = "db3";
+ private static String tbWithPartition = "tbWithPartition";
+ private static String tbWithoutPartition = "tbWithoutPartition";
- @BeforeClass
- public static void beforeClass() throws IOException {
+ private IcebergExternalCatalog externalCatalog;
+ private IcebergMetadataOps ops;
+
+
+ @Before
+ public void init() throws IOException {
createCatalog();
createTable();
}
- public static void createCatalog() throws IOException {
+ private void createCatalog() throws IOException {
Path warehousePath = Files.createTempDirectory("test_warehouse_");
String warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
HadoopCatalog hadoopCatalog = new HadoopCatalog();
@@ -74,25 +85,32 @@ public class IcebergTransactionTest {
props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize("df", props);
- ops = new IcebergMetadataOps(null, hadoopCatalog);
+ this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg",
"", Maps.newHashMap(), "");
+ new MockUp<IcebergHMSExternalCatalog>() {
+ @Mock
+ public Catalog getCatalog() {
+ return hadoopCatalog;
+ }
+ };
+ ops = new IcebergMetadataOps(externalCatalog, hadoopCatalog);
}
- public static void createTable() throws IOException {
+ private void createTable() throws IOException {
HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog();
icebergCatalog.createNamespace(Namespace.of(dbName));
- schema = new Schema(
- Types.NestedField.required(11, "ts1",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(12, "ts2",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(13, "ts3",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(14, "ts4",
Types.TimestampType.withoutZone()),
- Types.NestedField.required(15, "dt1", Types.DateType.get()),
- Types.NestedField.required(16, "dt2", Types.DateType.get()),
- Types.NestedField.required(17, "dt3", Types.DateType.get()),
- Types.NestedField.required(18, "dt4", Types.DateType.get()),
- Types.NestedField.required(19, "str1", Types.StringType.get()),
- Types.NestedField.required(20, "str2", Types.StringType.get()),
- Types.NestedField.required(21, "int1", Types.IntegerType.get()),
- Types.NestedField.required(22, "int2", Types.IntegerType.get())
+ Schema schema = new Schema(
+ Types.NestedField.required(11, "ts1",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(12, "ts2",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(13, "ts3",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(14, "ts4",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(15, "dt1", Types.DateType.get()),
+ Types.NestedField.required(16, "dt2", Types.DateType.get()),
+ Types.NestedField.required(17, "dt3", Types.DateType.get()),
+ Types.NestedField.required(18, "dt4", Types.DateType.get()),
+ Types.NestedField.required(19, "str1", Types.StringType.get()),
+ Types.NestedField.required(20, "str2", Types.StringType.get()),
+ Types.NestedField.required(21, "int1",
Types.IntegerType.get()),
+ Types.NestedField.required(22, "int2", Types.IntegerType.get())
);
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
@@ -112,7 +130,7 @@ public class IcebergTransactionTest {
icebergCatalog.createTable(TableIdentifier.of(dbName,
tbWithoutPartition), schema);
}
- public List<String> createPartitionValues() {
+ private List<String> createPartitionValues() {
Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
long ts = DateTimeUtil.microsFromInstant(instant);
@@ -165,14 +183,23 @@ public class IcebergTransactionTest {
ctdList.add(ctd1);
ctdList.add(ctd2);
+ Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithPartition));
+
+ new MockUp<IcebergUtils>() {
+ @Mock
+ public Table getRemoteTable(ExternalCatalog catalog,
SimpleTableInfo tableInfo) {
+ return table;
+ }
+ };
+
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(dbName, tbWithPartition);
- txn.finishInsert();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName,
tbWithPartition);
+ txn.beginInsert(tableInfo);
+ txn.finishInsert(tableInfo, Optional.empty());
txn.commit();
- Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithPartition));
- checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2",
"6");
+ checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2",
"6");
checkPushDownByPartitionForTs(table, "ts1");
checkPushDownByPartitionForTs(table, "ts2");
checkPushDownByPartitionForTs(table, "ts3");
@@ -189,7 +216,7 @@ public class IcebergTransactionTest {
checkPushDownByPartitionForBucketInt(table, "int1");
}
- public void checkPushDownByPartitionForBucketInt(Table table, String
column) {
+ private void checkPushDownByPartitionForBucketInt(Table table, String
column) {
// (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0
Integer i1 = 15;
@@ -212,12 +239,12 @@ public class IcebergTransactionTest {
checkPushDownByPartition(table, greaterThan2, 2);
}
- public void checkPushDownByPartitionForString(Table table, String column) {
+ private void checkPushDownByPartitionForString(Table table, String column)
{
// Since the string used to create the partition is in date format,
the date check can be reused directly
checkPushDownByPartitionForDt(table, column);
}
- public void checkPushDownByPartitionForTs(Table table, String column) {
+ private void checkPushDownByPartitionForTs(Table table, String column) {
String lessTs = "2023-12-11T12:34:56.123456";
String eqTs = "2024-12-11T12:34:56.123456";
String greaterTs = "2025-12-11T12:34:56.123456";
@@ -230,7 +257,7 @@ public class IcebergTransactionTest {
checkPushDownByPartition(table, greaterThan, 0);
}
- public void checkPushDownByPartitionForDt(Table table, String column) {
+ private void checkPushDownByPartitionForDt(Table table, String column) {
String less = "2023-12-11";
String eq = "2024-12-11";
String greater = "2025-12-11";
@@ -243,7 +270,7 @@ public class IcebergTransactionTest {
checkPushDownByPartition(table, greaterThan, 0);
}
- public void checkPushDownByPartition(Table table, Expression expr, Integer
expectFiles) {
+ private void checkPushDownByPartition(Table table, Expression expr,
Integer expectFiles) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().filter(expr).planFiles();
AtomicReference<Integer> cnt = new AtomicReference<>(0);
fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1));
@@ -268,45 +295,64 @@ public class IcebergTransactionTest {
ctdList.add(ctd1);
ctdList.add(ctd2);
+ Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithoutPartition));
+ new MockUp<IcebergUtils>() {
+ @Mock
+ public Table getRemoteTable(ExternalCatalog catalog,
SimpleTableInfo tableInfo) {
+ return table;
+ }
+ };
+
IcebergTransaction txn = getTxn();
txn.updateIcebergCommitData(ctdList);
- txn.beginInsert(dbName, tbWithoutPartition);
- txn.finishInsert();
+ SimpleTableInfo tableInfo = new SimpleTableInfo(dbName,
tbWithPartition);
+ txn.beginInsert(tableInfo);
+ txn.finishInsert(tableInfo, Optional.empty());
txn.commit();
- Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithoutPartition));
checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2",
"6");
}
- public void checkSnapshotProperties(Map<String, String> props,
- String addRecords,
- String addFileCnt,
- String addFileSize) {
+ private IcebergTransaction getTxn() {
+ return new IcebergTransaction(ops);
+ }
+
+ private void checkSnapshotProperties(Map<String, String> props,
+ String addRecords,
+ String addFileCnt,
+ String addFileSize) {
Assert.assertEquals(addRecords, props.get("added-records"));
Assert.assertEquals(addFileCnt, props.get("added-data-files"));
Assert.assertEquals(addFileSize, props.get("added-files-size"));
}
- public String numToYear(Integer num) {
+ private String numToYear(Integer num) {
Transform<Object, Integer> year = Transforms.year();
return year.toHumanString(Types.IntegerType.get(), num);
}
- public String numToMonth(Integer num) {
+ private String numToMonth(Integer num) {
Transform<Object, Integer> month = Transforms.month();
return month.toHumanString(Types.IntegerType.get(), num);
}
- public String numToDay(Integer num) {
+ private String numToDay(Integer num) {
Transform<Object, Integer> day = Transforms.day();
return day.toHumanString(Types.IntegerType.get(), num);
}
- public String numToHour(Integer num) {
+ private String numToHour(Integer num) {
Transform<Object, Integer> hour = Transforms.hour();
return hour.toHumanString(Types.IntegerType.get(), num);
}
+ @Test
+ public void tableCloneTest() {
+ Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName,
tbWithoutPartition));
+ Table cloneTable = (Table) SerializationUtils.clone((Serializable)
table);
+ Assert.assertNotNull(cloneTable);
+ }
+
@Test
public void testTransform() {
Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
@@ -322,7 +368,4 @@ public class IcebergTransactionTest {
Assert.assertEquals("2024-12-11", numToDay(dt));
}
- public IcebergTransaction getTxn() {
- return new IcebergTransaction(ops);
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]