This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 35b121d feat: Migrate format interfaces, column_stats, table
special_fields, and lookup store (#43)
35b121d is described below
commit 35b121ddc1bfa1d5694cea8aa4b61ff9d5097713
Author: lxy <[email protected]>
AuthorDate: Wed Jun 3 14:24:54 2026 +0800
feat: Migrate format interfaces, column_stats, table special_fields, and
lookup store (#43)
---
include/paimon/format/column_stats.h | 199 ++++++++
include/paimon/format/file_format.h | 53 ++
include/paimon/format/file_format_factory.h | 49 ++
include/paimon/format/format_stats_extractor.h | 60 +++
include/paimon/format/format_writer.h | 73 +++
include/paimon/format/reader_builder.h | 46 ++
include/paimon/format/writer_builder.h | 55 ++
src/paimon/common/format/column_stats.cpp | 556 +++++++++++++++++++++
src/paimon/common/format/column_stats_test.cpp | 311 ++++++++++++
src/paimon/common/format/file_format_factory.cpp | 45 ++
src/paimon/common/lookup/lookup_store_factory.cpp | 45 ++
src/paimon/common/lookup/lookup_store_factory.h | 72 +++
.../lookup/sort/sort_lookup_store_factory.cpp | 65 +++
.../common/lookup/sort/sort_lookup_store_factory.h | 95 ++++
.../lookup/sort/sort_lookup_store_footer.cpp | 69 +++
.../common/lookup/sort/sort_lookup_store_footer.h | 62 +++
.../common/lookup/sort/sort_lookup_store_test.cpp | 135 +++++
src/paimon/common/table/special_fields.h | 87 ++++
src/paimon/common/table/special_fields_test.cpp | 70 +++
19 files changed, 2147 insertions(+)
diff --git a/include/paimon/format/column_stats.h
b/include/paimon/format/column_stats.h
new file mode 100644
index 0000000..f16cb25
--- /dev/null
+++ b/include/paimon/format/column_stats.h
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// ColumnStats is an abstract base class that represents statistical
information for data columns
+/// in Paimon tables. It provides min/max values and null count statistics
+///
+/// Only primitive data types support min/max statistics. Nested types
(arrays, maps, structs) only
+/// track null counts through `NestedColumnStats`.
+///
+/// @note This is an abstract base class. Use the static factory methods
`CreateXXXColumnStats()` to
+/// create concrete instances for specific data types.
+class PAIMON_EXPORT ColumnStats {
+ public:
+ virtual ~ColumnStats() = default;
+
+ /// Gets the number of null values in this column.
+ virtual std::optional<int64_t> NullCount() const = 0;
+ virtual std::string ToString() const = 0;
+
+ /// Gets the field type that this column statistics instance represents.
+ virtual FieldType GetFieldType() const = 0;
+
+ /// @name CreateXXXColumnStats()
+ /// %Factory methods `CreateXXXColumnStats()` to create column statistics.
+ /// - min/max/null_count for primitive data types
+ /// - null_count for nested data types (arrays, maps, structs)
+ ///
+ /// @{
+ static std::unique_ptr<ColumnStats>
CreateBooleanColumnStats(std::optional<bool> min,
+
std::optional<bool> max,
+
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats>
CreateTinyIntColumnStats(std::optional<int8_t> min,
+
std::optional<int8_t> max,
+
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats> CreateSmallIntColumnStats(
+ std::optional<int16_t> min, std::optional<int16_t> max,
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats>
CreateIntColumnStats(std::optional<int32_t> min,
+
std::optional<int32_t> max,
+
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats>
CreateBigIntColumnStats(std::optional<int64_t> min,
+
std::optional<int64_t> max,
+
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats>
CreateFloatColumnStats(std::optional<float> min,
+
std::optional<float> max,
+
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats>
CreateDoubleColumnStats(std::optional<double> min,
+
std::optional<double> max,
+
std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats> CreateStringColumnStats(
+ const std::optional<std::string>& min, const
std::optional<std::string>& max,
+ std::optional<int64_t> null_count);
+ static std::unique_ptr<ColumnStats> CreateTimestampColumnStats(
+ const std::optional<Timestamp>& min, const std::optional<Timestamp>&
max,
+ std::optional<int64_t> null_count, int32_t precision);
+ static std::unique_ptr<ColumnStats> CreateDecimalColumnStats(const
std::optional<Decimal>& min,
+ const
std::optional<Decimal>& max,
+
std::optional<int64_t> null_count,
+ int32_t
precision, int32_t scale);
+ static std::unique_ptr<ColumnStats>
CreateDateColumnStats(std::optional<int32_t> min,
+
std::optional<int32_t> max,
+
std::optional<int64_t> null_count);
+ /// Creates column statistics for nested data types (arrays, maps,
structs), which only track
+ /// null counts.
+ static std::unique_ptr<ColumnStats> CreateNestedColumnStats(const
FieldType& nested_type,
+
std::optional<int64_t> null_count);
+ /// @}
+};
+
+class PAIMON_EXPORT BooleanColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<bool> Min() const = 0;
+ virtual std::optional<bool> Max() const = 0;
+ virtual void Collect(std::optional<bool> value) = 0;
+};
+
+class PAIMON_EXPORT TinyIntColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<int8_t> Min() const = 0;
+ virtual std::optional<int8_t> Max() const = 0;
+ virtual void Collect(std::optional<int8_t> value) = 0;
+};
+
+class PAIMON_EXPORT SmallIntColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<int16_t> Min() const = 0;
+ virtual std::optional<int16_t> Max() const = 0;
+ virtual void Collect(std::optional<int16_t> value) = 0;
+};
+
+class PAIMON_EXPORT IntColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<int32_t> Min() const = 0;
+ virtual std::optional<int32_t> Max() const = 0;
+ virtual void Collect(std::optional<int32_t> value) = 0;
+};
+
+class PAIMON_EXPORT BigIntColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<int64_t> Min() const = 0;
+ virtual std::optional<int64_t> Max() const = 0;
+ virtual void Collect(std::optional<int64_t> value) = 0;
+};
+
+class PAIMON_EXPORT FloatColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<float> Min() const = 0;
+ virtual std::optional<float> Max() const = 0;
+ virtual void Collect(std::optional<float> value) = 0;
+};
+
+class PAIMON_EXPORT DoubleColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<double> Min() const = 0;
+ virtual std::optional<double> Max() const = 0;
+ virtual void Collect(std::optional<double> value) = 0;
+};
+
+class PAIMON_EXPORT StringColumnStats : public ColumnStats {
+ public:
+ virtual const std::optional<std::string>& Min() const = 0;
+ virtual const std::optional<std::string>& Max() const = 0;
+ virtual void Collect(const std::optional<std::string>& value) = 0;
+};
+
+class PAIMON_EXPORT TimestampColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<Timestamp> Min() const = 0;
+ virtual std::optional<Timestamp> Max() const = 0;
+ virtual void Collect(const std::optional<Timestamp>& value) = 0;
+ virtual int32_t GetPrecision() const = 0;
+};
+
+class PAIMON_EXPORT DecimalColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<Decimal> Min() const = 0;
+ virtual std::optional<Decimal> Max() const = 0;
+ virtual void Collect(const std::optional<Decimal>& value) = 0;
+ virtual int32_t GetPrecision() const = 0;
+ virtual int32_t GetScale() const = 0;
+};
+
+class PAIMON_EXPORT DateColumnStats : public ColumnStats {
+ public:
+ virtual std::optional<int32_t> Min() const = 0;
+ virtual std::optional<int32_t> Max() const = 0;
+ virtual void Collect(std::optional<int32_t> value) = 0;
+};
+
+class PAIMON_EXPORT NestedColumnStats : public ColumnStats {
+ public:
+ NestedColumnStats(const FieldType& nested_type, std::optional<int64_t>
null_count)
+ : nested_type_(nested_type), null_count_(null_count) {
+ assert(nested_type == FieldType::ARRAY || nested_type ==
FieldType::MAP ||
+ nested_type == FieldType::STRUCT);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return null_count_;
+ }
+
+ std::string ToString() const override;
+
+ FieldType GetFieldType() const override;
+
+ private:
+ FieldType nested_type_;
+ std::optional<int64_t> null_count_;
+};
+
+} // namespace paimon
diff --git a/include/paimon/format/file_format.h
b/include/paimon/format/file_format.h
new file mode 100644
index 0000000..67c135f
--- /dev/null
+++ b/include/paimon/format/file_format.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/format/format_stats_extractor.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/format/writer_builder.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/predicate.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// `FileFormat` is used to create `ReaderBuilder` and `WriterBuilder`.
+class PAIMON_EXPORT FileFormat {
+ public:
+ virtual ~FileFormat() = default;
+ /// @return The corresponding identifier of file format, e.g., orc.
+ virtual const std::string& Identifier() const = 0;
+ /// @return A reader builder which will create reader with specific batch
size.
+ virtual Result<std::unique_ptr<ReaderBuilder>> CreateReaderBuilder(
+ int32_t batch_size) const = 0;
+
+ /// @return A `WriterBuilder` of the corresponding schema, or error status
when schema is
+ /// invalid.
+ virtual Result<std::unique_ptr<WriterBuilder>> CreateWriterBuilder(
+ ::ArrowSchema* schema, int32_t batch_size) const = 0;
+ /// @return A `FormatStatsExtractor` of current file format.
+ virtual Result<std::unique_ptr<FormatStatsExtractor>> CreateStatsExtractor(
+ ::ArrowSchema* schema) const = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/format/file_format_factory.h
b/include/paimon/format/file_format_factory.h
new file mode 100644
index 0000000..b472a8b
--- /dev/null
+++ b/include/paimon/format/file_format_factory.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/factories/factory.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class FileFormat;
+
+/// A factory for creating `FileFormat` instances.
+class PAIMON_EXPORT FileFormatFactory : public Factory {
+ public:
+ ~FileFormatFactory() override;
+
+ /// Get `FileFormat` corresponding to identifier.
+ ///
+ /// @pre Factory is already registered.
+ static Result<std::unique_ptr<FileFormat>> Get(
+ const std::string& identifier, const std::map<std::string,
std::string>& options);
+
+ /// Create a `FileFormat` with the corresponding options.
+ virtual Result<std::unique_ptr<FileFormat>> Create(
+ const std::map<std::string, std::string>& options) const = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/format/format_stats_extractor.h
b/include/paimon/format/format_stats_extractor.h
new file mode 100644
index 0000000..d049aaf
--- /dev/null
+++ b/include/paimon/format/format_stats_extractor.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+/// Extracts statistics directly from file.
+class PAIMON_EXPORT FormatStatsExtractor {
+ public:
+ virtual ~FormatStatsExtractor() = default;
+ /// File info fetched from physical file, currently only include row count.
+ class PAIMON_EXPORT FileInfo {
+ public:
+ explicit FileInfo(int64_t row_count) : row_count_(row_count) {}
+
+ int64_t GetRowCount() const {
+ return row_count_;
+ }
+
+ private:
+ int64_t row_count_;
+ };
+
+ /// Extracts statistics for each column of a data file based on the file
path and file system.
+ virtual Result<ColumnStatsVector> Extract(const
std::shared_ptr<FileSystem>& file_system,
+ const std::string& path,
+ const
std::shared_ptr<MemoryPool>& pool) = 0;
+
+ /// Extracts statistics for each column and `FileInfo` of a data file
based on the file path and
+ /// file system.
+ virtual Result<std::pair<ColumnStatsVector, FileInfo>> ExtractWithFileInfo(
+ const std::shared_ptr<FileSystem>& file_system, const std::string&
path,
+ const std::shared_ptr<MemoryPool>& pool) = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/format/format_writer.h
b/include/paimon/format/format_writer.h
new file mode 100644
index 0000000..cf570b3
--- /dev/null
+++ b/include/paimon/format/format_writer.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/type_fwd.h"
+
+struct ArrowArray;
+
+namespace paimon {
+/// File format writer, each writer corresponds to a data file.
+class PAIMON_EXPORT FormatWriter {
+ public:
+ virtual ~FormatWriter() = default;
+
+ /// Add a batch of records to the format writer.
+ /// @param batch Pointer to an ArrowArray containing the batch data to
write.
+ /// @return Status indicating success (OK) or failure with error
information.
+ /// @note The batch must conform to the schema expected by the writer.
+ /// @note This method can be called multiple times to write data
incrementally.
+ /// @note After calling `Finish()`, this method should not be called again.
+ virtual Status AddBatch(::ArrowArray* batch) = 0;
+
+ /// Flushes all intermediate buffered data to the format writer.
+ ///
+ /// @return Error status returned if the encoder cannot be flushed, or if
the output stream
+ /// return an error.
+ virtual Status Flush() = 0;
+
+ /// Finishes the writing. This must flush all internal buffer, finish
encoding, and write
+ /// footers.
+ ///
+ /// @note The writer is not expected to handle any more records via
`AddBatch()` after
+ /// this method is called.
+ ///
+ /// @warning This method **MUST NOT** close the stream that the writer
writes to. Closing
+ /// the stream is expected to happen through the invoker of this method
afterwards.
+ ///
+ /// @return Error status returned if the finalization fails.
+ virtual Status Finish() = 0;
+
+ /// Check if the writer has reached the `target_size`.
+ ///
+ /// @param suggested_check Whether it needs to be checked, but subclasses
can also decide
+ /// whether to check it themselves.
+ /// @param target_size The size of the target.
+ /// @return True if the target size was reached, otherwise false.
+ /// @return Error status returned if calculating the length fails.
+ virtual Result<bool> ReachTargetSize(bool suggested_check, int64_t
target_size) const = 0;
+
+ /// Get metrics of the writer
+ /// @return The accumulated writer metrics to current state.
+ virtual std::shared_ptr<Metrics> GetWriterMetrics() const = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/format/reader_builder.h
b/include/paimon/format/reader_builder.h
new file mode 100644
index 0000000..91b604d
--- /dev/null
+++ b/include/paimon/format/reader_builder.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/memory/memory_pool.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+/// Create a file batch reader based on the file path. Allows you to specify
memory pool.
+class PAIMON_EXPORT ReaderBuilder {
+ public:
+ virtual ~ReaderBuilder() = default;
+
+ /// Set memory pool to use.
+ virtual ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>&
pool) = 0;
+
+ /// Build a file batch reader based on the created `InputStream`.
+ virtual Result<std::unique_ptr<FileBatchReader>> Build(
+ const std::shared_ptr<InputStream>& path) const = 0;
+
+ /// Build a file batch reader based on the file path.
+ virtual Result<std::unique_ptr<FileBatchReader>> Build(const std::string&
path) const = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/format/writer_builder.h
b/include/paimon/format/writer_builder.h
new file mode 100644
index 0000000..33ef07e
--- /dev/null
+++ b/include/paimon/format/writer_builder.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+/// Create a file format writer based on the file output stream. Allows you to
specify memory pool.
+class PAIMON_EXPORT WriterBuilder {
+ public:
+ virtual ~WriterBuilder() = default;
+
+ /// Set memory pool to use.
+ virtual WriterBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>&
pool) = 0;
+
+ /// Build a file format writer based on the file output stream and file
compression.
+ virtual Result<std::unique_ptr<FormatWriter>> Build(const
std::shared_ptr<OutputStream>& out,
+ const std::string&
compression) = 0;
+};
+
+/// Create a file format writer based on the file path.
+class PAIMON_EXPORT DirectWriterBuilder : public WriterBuilder {
+ public:
+ ~DirectWriterBuilder() override = default;
+ virtual Result<std::unique_ptr<FormatWriter>> BuildFromPath(const
std::string& path) = 0;
+};
+
+/// `SpecificFSWriterBuilder` allows you to specify a specific file system.
+class PAIMON_EXPORT SpecificFSWriterBuilder : public WriterBuilder {
+ public:
+ ~SpecificFSWriterBuilder() override = default;
+ // Set file system to use.
+ virtual SpecificFSWriterBuilder* WithFileSystem(const
std::shared_ptr<FileSystem>& fs) = 0;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/format/column_stats.cpp
b/src/paimon/common/format/column_stats.cpp
new file mode 100644
index 0000000..d813427
--- /dev/null
+++ b/src/paimon/common/format/column_stats.cpp
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/column_stats.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "fmt/format.h"
+
+namespace paimon {
+
+template <typename T>
+std::string FormatStatsToString(const T& stats) {
+ auto to_str = [](auto opt) -> std::string { return opt ? fmt::format("{}",
*opt) : "null"; };
+ return fmt::format("min {}, max {}, null count {}", to_str(stats.Min()),
to_str(stats.Max()),
+ to_str(stats.NullCount()));
+}
+
+/// A simple column statistics, supports the following stats.
+///
+/// <ul>
+/// <li>min: the minimum value of the column
+/// <li>max: the maximum value of the column
+/// <li>null_count: the number of nulls
+/// </ul>
+template <typename T>
+class InternalStatsImpl {
+ public:
+ InternalStatsImpl(const std::optional<T>& min, const std::optional<T>& max,
+ std::optional<int64_t> null_count)
+ : min_(min), max_(max), null_count_(null_count) {}
+
+ const std::optional<T>& Min() const {
+ return min_;
+ }
+
+ const std::optional<T>& Max() const {
+ return max_;
+ }
+
+ void Collect(const std::optional<T>& value) {
+ if (value != std::nullopt) {
+ if (max_ != std::nullopt) {
+ if (max_.value() < value) {
+ max_ = value;
+ }
+ } else {
+ max_ = value;
+ }
+ if (min_ != std::nullopt) {
+ if (value < min_.value()) {
+ min_ = value;
+ }
+ } else {
+ min_ = value;
+ }
+ if (null_count_ == std::nullopt) {
+ null_count_ = 0;
+ }
+ } else {
+ if (null_count_ != std::nullopt) {
+ null_count_.value()++;
+ } else {
+ null_count_ = 1;
+ }
+ }
+ }
+
+ std::optional<int64_t> NullCount() const {
+ return null_count_;
+ }
+
+ private:
+ std::optional<T> min_;
+ std::optional<T> max_;
+ std::optional<int64_t> null_count_;
+};
+
+using InternalBooleanStats = InternalStatsImpl<bool>;
+using InternalTinyIntStats = InternalStatsImpl<int8_t>;
+using InternalSmallIntStats = InternalStatsImpl<int16_t>;
+using InternalIntStats = InternalStatsImpl<int32_t>;
+using InternalBigIntStats = InternalStatsImpl<int64_t>;
+using InternalFloatStats = InternalStatsImpl<float>;
+using InternalDoubleStats = InternalStatsImpl<double>;
+using InternalStringStats = InternalStatsImpl<std::string>;
+using InternalTimestampStats = InternalStatsImpl<Timestamp>;
+using InternalDecimalStats = InternalStatsImpl<Decimal>;
+
+class BooleanColumnStatsImpl : public BooleanColumnStats {
+ public:
+ explicit BooleanColumnStatsImpl(const InternalBooleanStats& stats) :
stats_(stats) {}
+
+ std::optional<bool> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<bool> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<bool> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::BOOLEAN;
+ }
+
+ private:
+ InternalBooleanStats stats_;
+};
+
+class TinyIntColumnStatsImpl : public TinyIntColumnStats {
+ public:
+ explicit TinyIntColumnStatsImpl(const InternalTinyIntStats& stats) :
stats_(stats) {}
+
+ std::optional<int8_t> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<int8_t> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<int8_t> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::TINYINT;
+ }
+
+ private:
+ InternalTinyIntStats stats_;
+};
+
+class SmallIntColumnStatsImpl : public SmallIntColumnStats {
+ public:
+ explicit SmallIntColumnStatsImpl(const InternalSmallIntStats& stats) :
stats_(stats) {}
+
+ std::optional<int16_t> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<int16_t> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<int16_t> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::SMALLINT;
+ }
+
+ private:
+ InternalSmallIntStats stats_;
+};
+
+class IntColumnStatsImpl : public IntColumnStats {
+ public:
+ explicit IntColumnStatsImpl(const InternalIntStats& stats) : stats_(stats)
{}
+
+ std::optional<int32_t> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<int32_t> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<int32_t> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::INT;
+ }
+
+ private:
+ InternalIntStats stats_;
+};
+
+class BigIntColumnStatsImpl : public BigIntColumnStats {
+ public:
+ explicit BigIntColumnStatsImpl(const InternalBigIntStats& stats) :
stats_(stats) {}
+
+ std::optional<int64_t> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<int64_t> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<int64_t> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::BIGINT;
+ }
+
+ private:
+ InternalBigIntStats stats_;
+};
+
+class FloatColumnStatsImpl : public FloatColumnStats {
+ public:
+ explicit FloatColumnStatsImpl(const InternalFloatStats& stats) :
stats_(stats) {}
+
+ std::optional<float> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<float> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<float> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::FLOAT;
+ }
+
+ private:
+ InternalFloatStats stats_;
+};
+
+class DoubleColumnStatsImpl : public DoubleColumnStats {
+ public:
+ explicit DoubleColumnStatsImpl(const InternalDoubleStats& stats) :
stats_(stats) {}
+
+ std::optional<double> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<double> Max() const override {
+ return stats_.Max();
+ }
+ void Collect(std::optional<double> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::DOUBLE;
+ }
+
+ private:
+ InternalDoubleStats stats_;
+};
+
+class StringColumnStatsImpl : public StringColumnStats {
+ public:
+ explicit StringColumnStatsImpl(const InternalStringStats& stats) :
stats_(stats) {}
+
+ const std::optional<std::string>& Min() const override {
+ return stats_.Min();
+ }
+
+ const std::optional<std::string>& Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(const std::optional<std::string>& value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::STRING;
+ }
+
+ private:
+ InternalStringStats stats_;
+};
+
+class TimestampColumnStatsImpl : public TimestampColumnStats {
+ public:
+ TimestampColumnStatsImpl(const InternalTimestampStats& stats, int32_t
precision)
+ : stats_(stats), precision_(precision) {}
+
+ std::optional<Timestamp> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<Timestamp> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(const std::optional<Timestamp>& value) override {
+ return stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return fmt::format("min {}, max {}, null count {}",
+ Min() ? Min().value().ToString() : "null",
+ Max() ? Max().value().ToString() : "null",
+ NullCount() ? std::to_string(NullCount().value()) :
"null");
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::TIMESTAMP;
+ }
+
+ int32_t GetPrecision() const override {
+ return precision_;
+ }
+
+ private:
+ InternalTimestampStats stats_;
+ int32_t precision_;
+};
+
+class DecimalColumnStatsImpl : public DecimalColumnStats {
+ public:
+ DecimalColumnStatsImpl(const InternalDecimalStats& stats, int32_t
precision, int32_t scale)
+ : stats_(stats), precision_(precision), scale_(scale) {}
+
+ std::optional<Decimal> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<Decimal> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(const std::optional<Decimal>& value) override {
+ return stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return fmt::format("min {}, max {}, null count {}",
+ Min() ? Min().value().ToString() : "null",
+ Max() ? Max().value().ToString() : "null",
+ NullCount() ? std::to_string(NullCount().value()) :
"null");
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::DECIMAL;
+ }
+
+ int32_t GetPrecision() const override {
+ return precision_;
+ }
+
+ int32_t GetScale() const override {
+ return scale_;
+ }
+
+ private:
+ InternalDecimalStats stats_;
+ int32_t precision_;
+ int32_t scale_;
+};
+
+class DateColumnStatsImpl : public DateColumnStats {
+ public:
+ explicit DateColumnStatsImpl(const InternalIntStats& stats) :
stats_(stats) {}
+
+ std::optional<int32_t> Min() const override {
+ return stats_.Min();
+ }
+
+ std::optional<int32_t> Max() const override {
+ return stats_.Max();
+ }
+
+ void Collect(std::optional<int32_t> value) override {
+ stats_.Collect(value);
+ }
+
+ std::optional<int64_t> NullCount() const override {
+ return stats_.NullCount();
+ }
+
+ std::string ToString() const override {
+ return FormatStatsToString(*this);
+ }
+
+ FieldType GetFieldType() const override {
+ return FieldType::DATE;
+ }
+
+ private:
+ InternalIntStats stats_;
+};
+
+std::string NestedColumnStats::ToString() const {
+ return fmt::format("min null, max null, null count {}",
+ NullCount() ? std::to_string(NullCount().value()) :
"null");
+}
+
+FieldType NestedColumnStats::GetFieldType() const {
+ return nested_type_;
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateBooleanColumnStats(
+ std::optional<bool> min, std::optional<bool> max, std::optional<int64_t>
null_count) {
+ return std::make_unique<BooleanColumnStatsImpl>(InternalBooleanStats(min,
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateTinyIntColumnStats(
+ std::optional<int8_t> min, std::optional<int8_t> max,
std::optional<int64_t> null_count) {
+ return std::make_unique<TinyIntColumnStatsImpl>(InternalTinyIntStats(min,
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateSmallIntColumnStats(
+ std::optional<int16_t> min, std::optional<int16_t> max,
std::optional<int64_t> null_count) {
+ return
std::make_unique<SmallIntColumnStatsImpl>(InternalSmallIntStats(min, max,
null_count));
+}
+
+std::unique_ptr<ColumnStats>
ColumnStats::CreateIntColumnStats(std::optional<int32_t> min,
+
std::optional<int32_t> max,
+
std::optional<int64_t> null_count) {
+ return std::make_unique<IntColumnStatsImpl>(InternalIntStats(min, max,
null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateBigIntColumnStats(
+ std::optional<int64_t> min, std::optional<int64_t> max,
std::optional<int64_t> null_count) {
+ return std::make_unique<BigIntColumnStatsImpl>(InternalBigIntStats(min,
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateFloatColumnStats(
+ std::optional<float> min, std::optional<float> max, std::optional<int64_t>
null_count) {
+ return std::make_unique<FloatColumnStatsImpl>(InternalFloatStats(min, max,
null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateDoubleColumnStats(
+ std::optional<double> min, std::optional<double> max,
std::optional<int64_t> null_count) {
+ return std::make_unique<DoubleColumnStatsImpl>(InternalDoubleStats(min,
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateStringColumnStats(
+ const std::optional<std::string>& min, const std::optional<std::string>&
max,
+ std::optional<int64_t> null_count) {
+ return std::make_unique<StringColumnStatsImpl>(InternalStringStats(min,
max, null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateTimestampColumnStats(
+ const std::optional<Timestamp>& min, const std::optional<Timestamp>& max,
+ std::optional<int64_t> null_count, int32_t precision) {
+ return
std::make_unique<TimestampColumnStatsImpl>(InternalTimestampStats(min, max,
null_count),
+ precision);
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateDecimalColumnStats(
+ const std::optional<Decimal>& min, const std::optional<Decimal>& max,
+ std::optional<int64_t> null_count, int32_t precision, int32_t scale) {
+ return std::make_unique<DecimalColumnStatsImpl>(InternalDecimalStats(min,
max, null_count),
+ precision, scale);
+}
+
+std::unique_ptr<ColumnStats>
ColumnStats::CreateDateColumnStats(std::optional<int32_t> min,
+
std::optional<int32_t> max,
+
std::optional<int64_t> null_count) {
+ return std::make_unique<DateColumnStatsImpl>(InternalIntStats(min, max,
null_count));
+}
+
+std::unique_ptr<ColumnStats> ColumnStats::CreateNestedColumnStats(
+ const FieldType& nested_type, std::optional<int64_t> null_count) {
+ return std::make_unique<NestedColumnStats>(nested_type, null_count);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/format/column_stats_test.cpp
b/src/paimon/common/format/column_stats_test.cpp
new file mode 100644
index 0000000..4937cfd
--- /dev/null
+++ b/src/paimon/common/format/column_stats_test.cpp
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/column_stats.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/result.h"
+
+namespace paimon::test {
+
+TEST(ColumnStatsTest, TestBooleanColumnStats) {
+ auto stats = ColumnStats::CreateBooleanColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<BooleanColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(true);
+ ASSERT_EQ(true, typed_stats->Min().value());
+ ASSERT_EQ(true, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(true, typed_stats->Min().value());
+ ASSERT_EQ(true, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min true, max true, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestTinyIntColumnStats) {
+ auto stats = ColumnStats::CreateTinyIntColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<TinyIntColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(5, typed_stats->Max().value());
+ typed_stats->Collect(10);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ typed_stats->Collect(1);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestSmallIntColumnStats) {
+ auto stats = ColumnStats::CreateSmallIntColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<SmallIntColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(5, typed_stats->Max().value());
+ typed_stats->Collect(10);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ typed_stats->Collect(1);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestIntColumnStats) {
+ auto stats = ColumnStats::CreateIntColumnStats(std::nullopt, std::nullopt,
std::nullopt);
+ auto typed_stats = dynamic_cast<IntColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(5, typed_stats->Max().value());
+ typed_stats->Collect(10);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ typed_stats->Collect(1);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestDateColumnStats) {
+ auto stats = ColumnStats::CreateDateColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<DateColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(5, typed_stats->Max().value());
+ typed_stats->Collect(10);
+ ASSERT_EQ(5, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ typed_stats->Collect(1);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1, typed_stats->Min().value());
+ ASSERT_EQ(10, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestBigIntColumnStats) {
+ auto stats = ColumnStats::CreateBigIntColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<BigIntColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1l, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5l);
+ ASSERT_EQ(5l, typed_stats->Min().value());
+ ASSERT_EQ(5l, typed_stats->Max().value());
+ typed_stats->Collect(10l);
+ ASSERT_EQ(5l, typed_stats->Min().value());
+ ASSERT_EQ(10l, typed_stats->Max().value());
+ typed_stats->Collect(1l);
+ ASSERT_EQ(1l, typed_stats->Min().value());
+ ASSERT_EQ(10l, typed_stats->Max().value());
+ ASSERT_EQ(1l, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1l, typed_stats->Min().value());
+ ASSERT_EQ(10l, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1, max 10, null count 2", stats->ToString());
+}
+TEST(ColumnStatsTest, TestFloatColumnStats) {
+ auto stats = ColumnStats::CreateFloatColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<FloatColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5.1f);
+ ASSERT_EQ(5.1f, typed_stats->Min().value());
+ ASSERT_EQ(5.1f, typed_stats->Max().value());
+ typed_stats->Collect(10.1f);
+ ASSERT_EQ(5.1f, typed_stats->Min().value());
+ ASSERT_EQ(10.1f, typed_stats->Max().value());
+ typed_stats->Collect(1.1f);
+ ASSERT_EQ(1.1f, typed_stats->Min().value());
+ ASSERT_EQ(10.1f, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1.1f, typed_stats->Min().value());
+ ASSERT_EQ(10.1f, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1.1, max 10.1, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestDoubleColumnStats) {
+ auto stats = ColumnStats::CreateDoubleColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<DoubleColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(5.12);
+ ASSERT_EQ(5.12, typed_stats->Min().value());
+ ASSERT_EQ(5.12, typed_stats->Max().value());
+ typed_stats->Collect(10.12);
+ ASSERT_EQ(5.12, typed_stats->Min().value());
+ ASSERT_EQ(10.12, typed_stats->Max().value());
+ typed_stats->Collect(1.12);
+ ASSERT_EQ(1.12, typed_stats->Min().value());
+ ASSERT_EQ(10.12, typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(1.12, typed_stats->Min().value());
+ ASSERT_EQ(10.12, typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1.12, max 10.12, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestStringColumnStats) {
+ auto stats = ColumnStats::CreateStringColumnStats(std::nullopt,
std::nullopt, std::nullopt);
+ auto typed_stats = dynamic_cast<StringColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect("abc");
+ ASSERT_EQ("abc", typed_stats->Min().value());
+ ASSERT_EQ("abc", typed_stats->Max().value());
+ typed_stats->Collect("cba");
+ ASSERT_EQ("abc", typed_stats->Min().value());
+ ASSERT_EQ("cba", typed_stats->Max().value());
+ typed_stats->Collect("你好");
+ ASSERT_EQ("abc", typed_stats->Min().value());
+ ASSERT_EQ("你好", typed_stats->Max().value());
+ ASSERT_EQ(1, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ("abc", typed_stats->Min().value());
+ ASSERT_EQ("你好", typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min abc, max 你好, null count 2", stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestTimestampColumnStats) {
+ auto stats = ColumnStats::CreateTimestampColumnStats(std::nullopt,
std::nullopt, std::nullopt,
+ /*precision=*/9);
+ auto typed_stats = dynamic_cast<TimestampColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1l, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(Timestamp(/*millisecond=*/10,
/*nano_of_millisecond=*/1));
+ ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/1),
typed_stats->Min().value());
+ ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/1),
typed_stats->Max().value());
+ typed_stats->Collect(Timestamp(/*millisecond=*/10,
/*nano_of_millisecond=*/2));
+ ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/1),
typed_stats->Min().value());
+ ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/2),
typed_stats->Max().value());
+ typed_stats->Collect(Timestamp(/*millisecond=*/9,
/*nano_of_millisecond=*/1));
+ ASSERT_EQ(Timestamp(/*millisecond=*/9, /*nano_of_millisecond=*/1),
typed_stats->Min().value());
+ ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/2),
typed_stats->Max().value());
+ ASSERT_EQ(1l, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(Timestamp(/*millisecond=*/9, /*nano_of_millisecond=*/1),
typed_stats->Min().value());
+ ASSERT_EQ(Timestamp(/*millisecond=*/10, /*nano_of_millisecond=*/2),
typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 1970-01-01 00:00:00.009000001, max 1970-01-01
00:00:00.010000002, null count 2",
+ stats->ToString());
+}
+
+TEST(ColumnStatsTest, TestDecimalColumnStats) {
+ int32_t precision = 21;
+ int32_t scale = 3;
+ auto stats = ColumnStats::CreateDecimalColumnStats(std::nullopt,
std::nullopt, std::nullopt,
+ precision, scale);
+ auto typed_stats = dynamic_cast<DecimalColumnStats*>(stats.get());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(std::nullopt, typed_stats->Min());
+ ASSERT_EQ(std::nullopt, typed_stats->Max());
+ ASSERT_EQ(1l, typed_stats->NullCount().value());
+ ASSERT_EQ("min null, max null, null count 1", stats->ToString());
+ typed_stats->Collect(
+ Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321234").value()));
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321234").value()),
+ typed_stats->Min().value());
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321234").value()),
+ typed_stats->Max().value());
+ typed_stats->Collect(
+ Decimal(precision, scale,
DecimalUtils::StrToInt128("923456789987654321234").value()));
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321234").value()),
+ typed_stats->Min().value());
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("923456789987654321234").value()),
+ typed_stats->Max().value());
+ typed_stats->Collect(
+ Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321233").value()));
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321233").value()),
+ typed_stats->Min().value());
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("923456789987654321234").value()),
+ typed_stats->Max().value());
+ ASSERT_EQ(1l, typed_stats->NullCount().value());
+ typed_stats->Collect(std::nullopt);
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("123456789987654321233").value()),
+ typed_stats->Min().value());
+ ASSERT_EQ(Decimal(precision, scale,
DecimalUtils::StrToInt128("923456789987654321234").value()),
+ typed_stats->Max().value());
+ ASSERT_EQ(2, typed_stats->NullCount().value());
+ ASSERT_EQ("min 123456789987654321.233, max 923456789987654321.234, null
count 2",
+ stats->ToString());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/format/file_format_factory.cpp
b/src/paimon/common/format/file_format_factory.cpp
new file mode 100644
index 0000000..27fbfce
--- /dev/null
+++ b/src/paimon/common/format/file_format_factory.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/file_format_factory.h"
+
+#include "fmt/format.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/format/file_format.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+FileFormatFactory::~FileFormatFactory() = default;
+
+Result<std::unique_ptr<FileFormat>> FileFormatFactory::Get(
+ const std::string& identifier, const std::map<std::string, std::string>&
options) {
+ auto factory_creator = FactoryCreator::GetInstance();
+ if (factory_creator == nullptr) {
+ return Status::Invalid("factory creator is null pointer");
+ }
+ auto file_format_factory =
+ dynamic_cast<FileFormatFactory*>(factory_creator->Create(identifier));
+ if (file_format_factory == nullptr) {
+ return Status::Invalid(fmt::format(
+ "Could not find a FileFormatFactory implementation class for
format '{}'", identifier));
+ }
+ return file_format_factory->Create(options);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/lookup/lookup_store_factory.cpp
b/src/paimon/common/lookup/lookup_store_factory.cpp
new file mode 100644
index 0000000..c5742e1
--- /dev/null
+++ b/src/paimon/common/lookup/lookup_store_factory.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/lookup/lookup_store_factory.h"
+
+#include "paimon/common/lookup/sort/sort_lookup_store_factory.h"
+namespace paimon {
+Result<std::shared_ptr<LookupStoreFactory>> LookupStoreFactory::Create(
+ MemorySlice::SliceComparator comparator, const
std::shared_ptr<CacheManager>& cache_manager,
+ const CoreOptions& options) {
+ const auto& compress_options = options.GetLookupCompressOptions();
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockCompressionFactory>
compression_factory,
+ BlockCompressionFactory::Create(compress_options));
+ return std::make_shared<SortLookupStoreFactory>(
+ std::move(comparator), cache_manager, options.GetCachePageSize(),
compression_factory);
+}
+
+Result<std::shared_ptr<BloomFilter>> LookupStoreFactory::BfGenerator(int64_t
row_count,
+ const
CoreOptions& options,
+
MemoryPool* pool) {
+ if (row_count <= 0 || !options.LookupCacheBloomFilterEnabled()) {
+ return std::shared_ptr<BloomFilter>();
+ }
+ auto bloom_filter = BloomFilter::Create(row_count,
options.GetLookupCacheBloomFilterFpp());
+ MemorySegment memory_segment =
+ MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool);
+ PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(memory_segment));
+ return bloom_filter;
+}
+} // namespace paimon
diff --git a/src/paimon/common/lookup/lookup_store_factory.h
b/src/paimon/common/lookup/lookup_store_factory.h
new file mode 100644
index 0000000..222dc7b
--- /dev/null
+++ b/src/paimon/common/lookup/lookup_store_factory.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "paimon/common/io/cache/cache_manager.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/core/core_options.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+namespace paimon {
+/// Reader, lookup value by key bytes.
+class LookupStoreReader {
+ public:
+ virtual ~LookupStoreReader() = default;
+ /// Lookup value by key.
+ virtual Result<std::shared_ptr<Bytes>> Lookup(const
std::shared_ptr<Bytes>& key) const = 0;
+ virtual Status Close() = 0;
+};
+
+/// Writer to prepare binary file.
+class LookupStoreWriter {
+ public:
+ virtual ~LookupStoreWriter() = default;
+ virtual Status Put(std::shared_ptr<Bytes>&& key, std::shared_ptr<Bytes>&&
value) = 0;
+ virtual Status Close() = 0;
+};
+
+/// A key-value store for lookup, key-value store should be single binary file
written once and
+/// ready to be used. This factory provide two interfaces:
+///
+/// Writer: written once to prepare binary file.
+/// Reader: lookup value by key bytes.
+class LookupStoreFactory {
+ public:
+ virtual ~LookupStoreFactory() = default;
+
+ virtual Result<std::unique_ptr<LookupStoreWriter>> CreateWriter(
+ const std::shared_ptr<paimon::FileSystem>& fs, const std::string&
file_path,
+ const std::shared_ptr<BloomFilter>& bloom_filter,
+ const std::shared_ptr<MemoryPool>& pool) const = 0;
+
+ virtual Result<std::unique_ptr<LookupStoreReader>> CreateReader(
+ const std::shared_ptr<paimon::FileSystem>& fs, const std::string&
file_path,
+ const std::shared_ptr<MemoryPool>& pool) const = 0;
+
+ static Result<std::shared_ptr<LookupStoreFactory>> Create(
+ MemorySlice::SliceComparator comparator, const
std::shared_ptr<CacheManager>& cache_manager,
+ const CoreOptions& options);
+
+ static Result<std::shared_ptr<BloomFilter>> BfGenerator(int64_t row_count,
+ const CoreOptions&
options,
+ MemoryPool* pool);
+};
+} // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp
b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp
new file mode 100644
index 0000000..e376dff
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/lookup/sort/sort_lookup_store_factory.h"
+
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/memory/bytes.h"
+namespace paimon {
+Result<std::unique_ptr<LookupStoreWriter>>
SortLookupStoreFactory::CreateWriter(
+ const std::shared_ptr<paimon::FileSystem>& fs, const std::string&
file_path,
+ const std::shared_ptr<BloomFilter>& bloom_filter,
+ const std::shared_ptr<MemoryPool>& pool) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<OutputStream> out,
+ fs->Create(file_path, /*overwrite=*/false));
+ return std::make_unique<SortLookupStoreWriter>(
+ out,
+ std::make_shared<SstFileWriter>(out, bloom_filter, block_size_,
compression_factory_, pool),
+ pool);
+}
+
+Result<std::unique_ptr<LookupStoreReader>>
SortLookupStoreFactory::CreateReader(
+ const std::shared_ptr<paimon::FileSystem>& fs, const std::string&
file_path,
+ const std::shared_ptr<MemoryPool>& pool) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> in,
fs->Open(file_path));
+ auto block_cache = std::make_shared<BlockCache>(file_path, in,
cache_manager_, pool);
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<SstFileReader> reader,
+ SstFileReader::CreateForSortLookupStore(in, comparator_, block_cache,
pool));
+ return std::make_unique<SortLookupStoreReader>(in, reader);
+}
+
+Status SortLookupStoreReader::Close() {
+ PAIMON_RETURN_NOT_OK(reader_->Close());
+ return in_->Close();
+}
+
+Status SortLookupStoreWriter::Close() {
+ PAIMON_RETURN_NOT_OK(writer_->Flush());
+ PAIMON_ASSIGN_OR_RAISE(std::optional<BloomFilterHandle>
bloom_filter_handle,
+ writer_->WriteBloomFilter());
+ PAIMON_ASSIGN_OR_RAISE(BlockHandle index_block_handle,
writer_->WriteIndexBlock());
+ SortLookupStoreFooter footer(index_block_handle, bloom_filter_handle);
+ auto slice = footer.WriteSortLookupStoreFooter(pool_.get());
+ PAIMON_RETURN_NOT_OK(writer_->WriteSlice(slice));
+ PAIMON_RETURN_NOT_OK(out_->Close());
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.h
b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h
new file mode 100644
index 0000000..3aa193c
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/common/lookup/lookup_store_factory.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/sst/sst_file_reader.h"
+#include "paimon/common/sst/sst_file_writer.h"
+#include "paimon/common/utils/bloom_filter.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+namespace paimon {
+/// Reader, lookup value by key bytes.
+class SortLookupStoreReader : public LookupStoreReader {
+ public:
+ SortLookupStoreReader(const std::shared_ptr<InputStream>& in,
+ const std::shared_ptr<SstFileReader>& reader)
+ : in_(in), reader_(reader) {}
+
+ Result<std::shared_ptr<Bytes>> Lookup(const std::shared_ptr<Bytes>& key)
const override {
+ return reader_->Lookup(key);
+ }
+
+ Status Close() override;
+
+ private:
+ std::shared_ptr<InputStream> in_;
+ std::shared_ptr<SstFileReader> reader_;
+};
+
+/// Writer to prepare binary file.
+class SortLookupStoreWriter : public LookupStoreWriter {
+ public:
+ SortLookupStoreWriter(const std::shared_ptr<OutputStream>& out,
+ const std::shared_ptr<SstFileWriter>& writer,
+ const std::shared_ptr<MemoryPool>& pool)
+ : out_(out), writer_(writer), pool_(pool) {}
+
+ Status Put(std::shared_ptr<Bytes>&& key, std::shared_ptr<Bytes>&& value)
override {
+ return writer_->Write(std::move(key), std::move(value));
+ }
+
+ Status Close() override;
+
+ private:
+ std::shared_ptr<OutputStream> out_;
+ std::shared_ptr<SstFileWriter> writer_;
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+/// A `LookupStoreFactory` which uses hash to lookup records on disk.
+class SortLookupStoreFactory : public LookupStoreFactory {
+ public:
+ SortLookupStoreFactory(MemorySlice::SliceComparator comparator,
+ const std::shared_ptr<CacheManager>& cache_manager,
int32_t block_size,
+ const std::shared_ptr<BlockCompressionFactory>&
compression_factory)
+ : block_size_(block_size),
+ comparator_(std::move(comparator)),
+ cache_manager_(cache_manager),
+ compression_factory_(compression_factory) {}
+
+ Result<std::unique_ptr<LookupStoreWriter>> CreateWriter(
+ const std::shared_ptr<paimon::FileSystem>& fs, const std::string&
file_path,
+ const std::shared_ptr<BloomFilter>& bloom_filter,
+ const std::shared_ptr<MemoryPool>& pool) const override;
+
+ Result<std::unique_ptr<LookupStoreReader>> CreateReader(
+ const std::shared_ptr<paimon::FileSystem>& fs, const std::string&
file_path,
+ const std::shared_ptr<MemoryPool>& pool) const override;
+
+ private:
+ int32_t block_size_;
+ MemorySlice::SliceComparator comparator_;
+ std::shared_ptr<CacheManager> cache_manager_;
+ std::shared_ptr<BlockCompressionFactory> compression_factory_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_footer.cpp
b/src/paimon/common/lookup/sort/sort_lookup_store_footer.cpp
new file mode 100644
index 0000000..d15ece5
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_footer.cpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/lookup/sort/sort_lookup_store_footer.h"
+
+#include "fmt/format.h"
+#include "paimon/common/memory/memory_slice_output.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<SortLookupStoreFooter>>
SortLookupStoreFooter::ReadSortLookupStoreFooter(
+ MemorySliceInput* input) {
+ auto offset = input->ReadLong();
+ auto size = input->ReadInt();
+ auto expected_entries = input->ReadLong();
+ std::optional<BloomFilterHandle> bloom_filter_handle;
+ if (offset || size || expected_entries) {
+ bloom_filter_handle.emplace(offset, size, expected_entries);
+ }
+ auto index_offset = input->ReadLong();
+ auto index_size = input->ReadInt();
+ BlockHandle index_block_handle(index_offset, index_size);
+
+ // skip padding
+ PAIMON_RETURN_NOT_OK(input->SetPosition(ENCODED_LENGTH - 4));
+
+ auto magic = input->ReadInt();
+ if (magic != MAGIC_NUMBER) {
+ return Status::Invalid(
+ fmt::format("Expected magic number {}, but got {}", MAGIC_NUMBER,
magic));
+ }
+ return std::make_unique<SortLookupStoreFooter>(index_block_handle,
bloom_filter_handle);
+}
+
+MemorySlice SortLookupStoreFooter::WriteSortLookupStoreFooter(MemoryPool*
pool) {
+ MemorySliceOutput output(ENCODED_LENGTH, pool);
+ // 20 bytes
+ if (!bloom_filter_handle_) {
+ output.WriteValue(static_cast<int64_t>(0));
+ output.WriteValue(static_cast<int32_t>(0));
+ output.WriteValue(static_cast<int64_t>(0));
+ } else {
+ output.WriteValue(bloom_filter_handle_->Offset());
+ output.WriteValue(bloom_filter_handle_->Size());
+ output.WriteValue(bloom_filter_handle_->ExpectedEntries());
+ }
+ // 12 bytes
+ output.WriteValue(index_block_handle_.Offset());
+ output.WriteValue(index_block_handle_.Size());
+ // 4 bytes
+ output.WriteValue(MAGIC_NUMBER);
+ return output.ToSlice();
+}
+} // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_footer.h
b/src/paimon/common/lookup/sort/sort_lookup_store_footer.h
new file mode 100644
index 0000000..f1462a8
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_footer.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/sst/block_handle.h"
+#include "paimon/common/sst/bloom_filter_handle.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Footer of a sort lookup store.
+class PAIMON_EXPORT SortLookupStoreFooter {
+ public:
+ static Result<std::unique_ptr<SortLookupStoreFooter>>
ReadSortLookupStoreFooter(
+ MemorySliceInput* input);
+
+ public:
+ SortLookupStoreFooter(const BlockHandle& index_block_handle,
+ const std::optional<BloomFilterHandle>&
bloom_filter_handle)
+ : index_block_handle_(index_block_handle),
bloom_filter_handle_(bloom_filter_handle) {}
+
+ ~SortLookupStoreFooter() = default;
+
+ const BlockHandle& GetIndexBlockHandle() const {
+ return index_block_handle_;
+ }
+ const std::optional<BloomFilterHandle>& GetBloomFilterHandle() const {
+ return bloom_filter_handle_;
+ }
+
+ MemorySlice WriteSortLookupStoreFooter(MemoryPool* pool);
+
+ public:
+ // 20 bytes for bloom filter handle, 12 bytes for index block handle, 4
bytes for magic number
+ static constexpr int32_t ENCODED_LENGTH = 36;
+ static constexpr int32_t MAGIC_NUMBER = 1481571681;
+
+ private:
+ BlockHandle index_block_handle_;
+ std::optional<BloomFilterHandle> bloom_filter_handle_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp
b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp
new file mode 100644
index 0000000..e5847e8
--- /dev/null
+++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "arrow/api.h"
+#include "arrow/ipc/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row.h"
+#include "paimon/common/data/serializer/row_compacted_serializer.h"
+#include "paimon/common/lookup/lookup_store_factory.h"
+#include "paimon/common/lookup/sort/sort_lookup_store_factory.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(SortLookupStoreTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ auto check_result = [&](const std::map<std::string, std::string>&
options_map) {
+ auto dir = UniqueTestDirectory::Create("local");
+ std::string file_path = dir->Str() + "/test.lookup";
+ auto fs = dir->GetFileSystem();
+
+ arrow::FieldVector fields = {
+ arrow::field("key", arrow::utf8()),
+ arrow::field("value", arrow::int32()),
+ };
+ auto key_type = arrow::struct_({fields[0]});
+ auto value_type = arrow::struct_({fields[1]});
+ ASSERT_OK_AND_ASSIGN(auto comparator,
RowCompactedSerializer::CreateSliceComparator(
+
arrow::schema(key_type->fields()), pool));
+ ASSERT_OK_AND_ASSIGN(CoreOptions options,
CoreOptions::FromMap(options_map));
+
+ ASSERT_OK_AND_ASSIGN(
+ auto factory,
+ LookupStoreFactory::Create(comparator,
std::make_shared<CacheManager>(1024 * 1024, 0.0),
+ options));
+ int64_t row_count = 6;
+ ASSERT_OK_AND_ASSIGN(auto bloom_filter,
+ LookupStoreFactory::BfGenerator(row_count,
options, pool.get()));
+ ASSERT_OK_AND_ASSIGN(auto writer, factory->CreateWriter(fs, file_path,
bloom_filter, pool));
+
+ std::shared_ptr<arrow::Array> key_array =
+ arrow::ipc::internal::json::ArrayFromJSON(key_type,
+ R"([
+ ["Alex"],
+ ["Alice"],
+ ["Bob"],
+ ["David"],
+ ["Lily"],
+ ["Lucy"]
+ ])")
+ .ValueOrDie();
+ std::shared_ptr<arrow::Array> value_array =
+ arrow::ipc::internal::json::ArrayFromJSON(value_type,
+ R"([
+ [0],
+ [10],
+ [20],
+ [30],
+ [40],
+ [100]
+ ])")
+ .ValueOrDie();
+
+ ASSERT_OK_AND_ASSIGN(auto key_serializer,
RowCompactedSerializer::Create(
+
arrow::schema(key_type->fields()), pool));
+ ASSERT_OK_AND_ASSIGN(auto value_serializer,
RowCompactedSerializer::Create(
+
arrow::schema(value_type->fields()), pool));
+ // write data
+ std::vector<std::pair<std::shared_ptr<Bytes>, std::shared_ptr<Bytes>>>
key_value_bytes_vec;
+ for (int64_t i = 0; i < row_count; i++) {
+ auto key_struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(key_array);
+ auto value_struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(value_array);
+ ASSERT_TRUE(key_struct_array);
+ ASSERT_TRUE(value_struct_array);
+ auto key_row = std::make_shared<ColumnarRow>(
+ /*struct_array=*/nullptr, key_struct_array->fields(), pool,
/*row_id=*/i);
+ auto value_row = std::make_shared<ColumnarRow>(
+ /*struct_array=*/nullptr, value_struct_array->fields(), pool,
/*row_id=*/i);
+ ASSERT_OK_AND_ASSIGN(auto key_bytes,
key_serializer->SerializeToBytes(*key_row));
+ ASSERT_OK_AND_ASSIGN(auto value_bytes,
value_serializer->SerializeToBytes(*value_row));
+ key_value_bytes_vec.emplace_back(key_bytes, value_bytes);
+ ASSERT_OK(writer->Put(std::move(key_bytes),
std::move(value_bytes)));
+ }
+ ASSERT_OK(writer->Close());
+
+ // read data
+ ASSERT_TRUE(fs->Exists(file_path).value());
+ ASSERT_OK_AND_ASSIGN(auto reader, factory->CreateReader(fs, file_path,
pool));
+ for (int64_t i = 0; i < row_count; i++) {
+ const auto& [key, value] = key_value_bytes_vec[i];
+ ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(key));
+ ASSERT_TRUE(value_bytes);
+ // test value bytes
+ ASSERT_EQ(*value_bytes, *value);
+
+ // test deserialize data
+ ASSERT_OK_AND_ASSIGN(auto de_row,
value_serializer->Deserialize(value_bytes));
+ auto value_struct_array =
std::dynamic_pointer_cast<arrow::StructArray>(value_array);
+ ASSERT_TRUE(value_struct_array);
+ auto typed_value_array =
+
std::dynamic_pointer_cast<arrow::Int32Array>(value_struct_array->field(0));
+ ASSERT_TRUE(typed_value_array);
+ ASSERT_EQ(de_row->GetInt(0), typed_value_array->Value(i));
+ }
+ // test non-exist key
+ auto non_exist_key = std::make_shared<Bytes>("non-exist", pool.get());
+ ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(non_exist_key));
+ ASSERT_FALSE(value_bytes);
+ ASSERT_OK(reader->Close());
+ };
+
+ check_result({});
+ check_result(std::map<std::string, std::string>(
+ {{Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, "false"}}));
+ check_result(
+ std::map<std::string,
std::string>({{Options::LOOKUP_CACHE_SPILL_COMPRESSION, "lz4"}}));
+ check_result(
+ std::map<std::string,
std::string>({{Options::SPILL_COMPRESSION_ZSTD_LEVEL, "3"}}));
+ check_result(std::map<std::string,
std::string>({{Options::CACHE_PAGE_SIZE, "4"}}));
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/table/special_fields.h
b/src/paimon/common/table/special_fields.h
new file mode 100644
index 0000000..384150e
--- /dev/null
+++ b/src/paimon/common/table/special_fields.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <string>
+
+#include "arrow/type_fwd.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/utils/special_field_ids.h"
+
+namespace paimon {
+
+struct SpecialFields {
+ SpecialFields() = delete;
+ ~SpecialFields() = delete;
+
+ static constexpr char KEY_FIELD_PREFIX[] = "_KEY_";
+ static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2;
+
+ static const DataField& SequenceNumber() {
+ static const DataField data_field = DataField(
+ SpecialFieldIds::SEQUENCE_NUMBER, arrow::field("_SEQUENCE_NUMBER",
arrow::int64()));
+ return data_field;
+ }
+
+ static const DataField& ValueKind() {
+ static const DataField data_field =
+ DataField(SpecialFieldIds::VALUE_KIND, arrow::field("_VALUE_KIND",
arrow::int8()));
+ return data_field;
+ }
+
+ static const DataField& RowKind() {
+ static const DataField data_field =
+ DataField(SpecialFieldIds::ROW_KIND, arrow::field("rowkind",
arrow::utf8()));
+ return data_field;
+ }
+
+ static const DataField& RowId() {
+ static const DataField data_field =
+ DataField(SpecialFieldIds::ROW_ID, arrow::field("_ROW_ID",
arrow::int64()));
+ return data_field;
+ }
+
+ static const DataField& IndexScore() {
+ static const DataField data_field =
+ DataField(SpecialFieldIds::INDEX_SCORE,
arrow::field("_INDEX_SCORE", arrow::float32()));
+ return data_field;
+ }
+
+ static bool IsSpecialFieldName(const std::string& field_name) {
+ if (field_name == SequenceNumber().Name() || field_name ==
ValueKind().Name() ||
+ field_name == RowId().Name() || field_name == IndexScore().Name())
{
+ return true;
+ }
+ return false;
+ }
+ // TODO(xinyu.lxy): add a func to complete row-tracking fields
+
+ static std::shared_ptr<arrow::Schema> CompleteSequenceAndValueKindField(
+ const std::shared_ptr<arrow::Schema>& schema) {
+ arrow::FieldVector target_fields;
+
target_fields.push_back(DataField::ConvertDataFieldToArrowField(SequenceNumber()));
+
target_fields.push_back(DataField::ConvertDataFieldToArrowField(ValueKind()));
+ target_fields.insert(target_fields.end(), schema->fields().begin(),
schema->fields().end());
+ return arrow::schema(target_fields);
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/table/special_fields_test.cpp
b/src/paimon/common/table/special_fields_test.cpp
new file mode 100644
index 0000000..1808fad
--- /dev/null
+++ b/src/paimon/common/table/special_fields_test.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/table/special_fields.h"
+
+#include <memory>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+TEST(SpecialFieldsTest, TestSequenceNumberField) {
+ ASSERT_EQ(SpecialFields::SequenceNumber().Id(),
std::numeric_limits<int32_t>::max() - 1);
+ ASSERT_EQ(SpecialFields::SequenceNumber().Name(), "_SEQUENCE_NUMBER");
+ ASSERT_EQ(SpecialFields::SequenceNumber().Type()->id(),
arrow::Type::INT64);
+}
+
+TEST(SpecialFieldsTest, TestValueKindField) {
+ ASSERT_EQ(SpecialFields::ValueKind().Id(),
std::numeric_limits<int32_t>::max() - 2);
+ ASSERT_EQ(SpecialFields::ValueKind().Name(), "_VALUE_KIND");
+ ASSERT_EQ(SpecialFields::ValueKind().Type()->id(), arrow::Type::INT8);
+}
+
+TEST(SpecialFieldsTest, TestRowKindField) {
+ ASSERT_EQ(SpecialFields::RowKind().Id(),
std::numeric_limits<int32_t>::max() - 3);
+ ASSERT_EQ(SpecialFields::RowKind().Name(), "rowkind");
+ ASSERT_EQ(SpecialFields::RowKind().Type()->id(), arrow::Type::STRING);
+}
+
+TEST(SpecialFieldsTest, TestRowIdField) {
+ ASSERT_EQ(SpecialFields::RowId().Id(), std::numeric_limits<int32_t>::max()
- 5);
+ ASSERT_EQ(SpecialFields::RowId().Name(), "_ROW_ID");
+ ASSERT_EQ(SpecialFields::RowId().Type()->id(), arrow::Type::INT64);
+}
+
+TEST(SpecialFieldsTest, TestIndexScore) {
+ ASSERT_EQ(SpecialFields::IndexScore().Id(),
std::numeric_limits<int32_t>::max() - 10000 - 1);
+ ASSERT_EQ(SpecialFields::IndexScore().Name(), "_INDEX_SCORE");
+ ASSERT_EQ(SpecialFields::IndexScore().Type()->id(), arrow::Type::FLOAT);
+}
+
+TEST(SpecialFieldsTest, TestKeyValueSpecialFieldCount) {
+ ASSERT_EQ(SpecialFields::KEY_VALUE_SPECIAL_FIELD_COUNT, 2);
+}
+
+TEST(SpecialFieldsTest, TestIsSpecialFieldName) {
+ ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_SEQUENCE_NUMBER"));
+ ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_VALUE_KIND"));
+ ASSERT_FALSE(SpecialFields::IsSpecialFieldName("VALUE_KIND"));
+ ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_ROW_ID"));
+ ASSERT_TRUE(SpecialFields::IsSpecialFieldName("_INDEX_SCORE"));
+}
+
+} // namespace paimon::test