This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 6968ab8  feat: introduce basic data types (#10)
6968ab8 is described below

commit 6968ab8a871d300d8e14022c9c56b4c5095c87c8
Author: lszskye <[email protected]>
AuthorDate: Mon May 25 11:58:37 2026 +0800

    feat: introduce basic data types (#10)
---
 include/paimon/data/blob.h                      | 113 +++++++++
 include/paimon/data/decimal.h                   | 146 ++++++++++++
 include/paimon/data/timestamp.h                 | 139 +++++++++++
 include/paimon/logging.h                        |  73 ++++++
 src/paimon/common/data/blob_defs.h              |  63 +++++
 src/paimon/common/data/blob_descriptor.cpp      | 114 +++++++++
 src/paimon/common/data/blob_descriptor.h        |  92 ++++++++
 src/paimon/common/data/blob_descriptor_test.cpp | 228 ++++++++++++++++++
 src/paimon/common/data/blob_test.cpp            | 188 +++++++++++++++
 src/paimon/common/data/blob_utils.cpp           | 120 ++++++++++
 src/paimon/common/data/blob_utils.h             |  71 ++++++
 src/paimon/common/data/blob_utils_test.cpp      | 145 ++++++++++++
 src/paimon/common/data/decimal.cpp              | 222 ++++++++++++++++++
 src/paimon/common/data/decimal_test.cpp         | 292 ++++++++++++++++++++++++
 src/paimon/common/data/timestamp.cpp            |  63 +++++
 src/paimon/common/data/timestamp_test.cpp       | 103 +++++++++
 src/paimon/common/logging/logging.cpp           |  94 ++++++++
 src/paimon/common/logging/logging_test.cpp      |  42 ++++
 18 files changed, 2308 insertions(+)

diff --git a/include/paimon/data/blob.h b/include/paimon/data/blob.h
new file mode 100644
index 0000000..ff89714
--- /dev/null
+++ b/include/paimon/data/blob.h
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// Represents a binary large object (BLOB) that can be read from various 
sources.
+///
+/// The Blob class provides a unified interface for handling binary data from 
different
+/// sources including file path and blob descriptor. It supports reading data 
through input
+/// streams and provides descriptor-based serialization.
+class PAIMON_EXPORT Blob {
+ public:
+    ~Blob();
+
+    /// Creates a Blob from a file path.
+    ///
+    /// @param path The file path to create the blob from.
+    /// @return A result containing the created blob or an error.
+    static Result<std::unique_ptr<Blob>> FromPath(const std::string& path);
+
+    /// Creates a Blob from a file path with specified offset and length.
+    ///
+    /// @param path The file path to create the blob from.
+    /// @param offset The starting offset within the file.
+    /// @param length The length of data to read from the file.
+    /// @return A result containing the created blob or an error.
+    static Result<std::unique_ptr<Blob>> FromPath(const std::string& path, 
int64_t offset,
+                                                  int64_t length);
+
+    /// Creates a Blob from a blob descriptor.
+    ///
+    /// @param buffer The buffer of the blob descriptor.
+    /// @param length The length of the buffer.
+    /// @return A result containing the created blob or an error.
+    static Result<std::unique_ptr<Blob>> FromDescriptor(const char* buffer, 
uint64_t length);
+
+    /// Converts the blob to a blob descriptor.
+    ///
+    /// @param pool The memory pool to use for allocation.
+    /// @return A blob descriptor bytes representing the blob.
+    PAIMON_UNIQUE_PTR<Bytes> ToDescriptor(const std::shared_ptr<MemoryPool>& 
pool) const;
+
+    /// Gets the URI of the blob.
+    const std::string& Uri() const;
+
+    /// Creates an input stream for reading the blob data.
+    ///
+    /// @param fs The file system to use for reading.
+    /// @return A result containing the input stream or an error.
+    Result<std::unique_ptr<InputStream>> NewInputStream(
+        const std::shared_ptr<FileSystem>& fs) const;
+
+    /// Reads the blob data to bytes.
+    ///
+    /// @param fs The file system to use for reading.
+    /// @param pool The memory pool to use for allocation.
+    /// @return A result containing the blob data bytes or an error.
+    Result<PAIMON_UNIQUE_PTR<Bytes>> ToData(const std::shared_ptr<FileSystem>& 
fs,
+                                            const std::shared_ptr<MemoryPool>& 
pool) const;
+
+    /// Creates an Arrow field definition for the Blob type.
+    ///
+    /// This function constructs an Arrow Field (internally using 
`arrow::large_binary()`)
+    /// and exports it to the C data interface structure `::ArrowSchema`.
+    /// It automatically injects Paimon-specific metadata to identify the 
field as a BLOB.
+    ///
+    /// @param field_name The name of the Arrow field.
+    /// @param metadata A map of key-value metadata to be attached to the 
field.
+    /// @return A result containing a unique pointer to the generated 
`ArrowSchema` or an error.
+    static Result<std::unique_ptr<::ArrowSchema>> ArrowField(
+        const std::string& field_name, std::unordered_map<std::string, 
std::string> metadata = {});
+
+ private:
+    class Impl;
+
+    explicit Blob(std::unique_ptr<Impl>&& impl);
+
+    std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/data/decimal.h b/include/paimon/data/decimal.h
new file mode 100644
index 0000000..707e3e5
--- /dev/null
+++ b/include/paimon/data/decimal.h
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+
+/// A data structure representing data of Decimal. It might be stored in a 
compact representation
+/// (as a long value) if values are small enough.
+class PAIMON_EXPORT Decimal {
+ public:
+    using int128_t = __int128_t;
+    using uint128_t = __uint128_t;
+
+    Decimal(int32_t precision, int32_t scale, int128_t value)
+        : precision_(precision), scale_(scale), value_(value) {}
+
+    /// Get the **precision** of this decimal.
+    ///
+    /// The precision is the number of digits in the unscaled value.
+    int32_t Precision() const {
+        return precision_;
+    }
+
+    /// Get the **scale** of this decimal.
+    int32_t Scale() const {
+        return scale_;
+    }
+
+    /// Get the underlying int128_t value of this decimal.
+    int128_t Value() const {
+        return value_;
+    }
+
+    /// Get the low 64 bits of the decimal value.
+    uint64_t LowBits() const {
+        return static_cast<uint64_t>(value_ & 0xFFFFFFFFFFFFFFFF);
+    }
+
+    /// Get the high 64 bits of the decimal value.
+    uint64_t HighBits() const {
+        return static_cast<uint64_t>(value_ >> 64);
+    }
+
+    /// @return Whether the decimal value is small enough to be stored in a 
long.
+    bool IsCompact() const {
+        return precision_ <= MAX_COMPACT_PRECISION;
+    }
+
+    std::string ToString() const;
+
+    /// @return Whether the decimal value is small enough to be stored in a 
long.
+    static bool IsCompact(int32_t precision) {
+        return precision <= MAX_COMPACT_PRECISION;
+    }
+
+    /// Creates an instance of `Decimal` from an unscaled long value and the 
given precision and
+    /// scale.
+    static Decimal FromUnscaledLong(int64_t unscaled_long, int32_t precision, 
int32_t scale) {
+        return {precision, scale, unscaled_long};
+    }
+
+    /// @return A long describing the **unscaled value** of this decimal.
+    int64_t ToUnscaledLong() const {
+        int64_t ret = 0;
+        uint64_t low_bits = (value_ & 0xFFFFFFFFFFFFFFFF);
+        memcpy(&ret, &low_bits, sizeof(uint64_t));
+        return ret;
+    }
+
+    /// @return A byte array describing the **unscaled value** of this decimal.
+    std::vector<char> ToUnscaledBytes() const;
+
+    /// Creates an instance of `Decimal` from an unscaled byte array value and 
the given precision
+    /// and scale.
+    static Decimal FromUnscaledBytes(int32_t precision, int32_t scale, Bytes* 
bytes);
+
+    bool operator==(const Decimal& other) const {
+        if (this == &other) {
+            return true;
+        }
+        return CompareTo(other) == 0;
+    }
+
+    bool operator<(const Decimal& other) const {
+        if (this == &other) {
+            return false;
+        }
+        return CompareTo(other) < 0;
+    }
+
+    bool operator>(const Decimal& other) const {
+        if (this == &other) {
+            return false;
+        }
+        return CompareTo(other) > 0;
+    }
+
+    int32_t CompareTo(const Decimal& other) const;
+    static constexpr int32_t DEFAULT_PRECISION = 10;
+    static constexpr int32_t DEFAULT_SCALE = 0;
+    static constexpr int32_t MIN_PRECISION = 1;
+    static constexpr int32_t MAX_PRECISION = 38;
+
+ private:
+    static constexpr int32_t MAX_COMPACT_PRECISION = 18;
+    static const int128_t INT128_MAXIMUM_VALUE;
+    static const int128_t INT128_MINIMUM_VALUE;
+    static const int64_t POWERS_OF_TEN[MAX_COMPACT_PRECISION + 1];
+
+    static int32_t clz_u128(uint128_t u);
+    static int32_t count_leading_zero_bytes(uint128_t u);
+    static int32_t count_leading_all_ones_bytes(uint128_t u);
+    static int128_t DownScaleInt128(int128_t value, int32_t scale);
+    static int128_t ScaleInt128(int128_t value, int32_t scale, bool* overflow);
+
+ private:
+    int32_t precision_ = 0;
+    int32_t scale_ = 0;
+    int128_t value_ = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/data/timestamp.h b/include/paimon/data/timestamp.h
new file mode 100644
index 0000000..93dfe1b
--- /dev/null
+++ b/include/paimon/data/timestamp.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <chrono>
+#include <cstdint>
+#include <string>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// A data structure representing data of Timestamp without timezone.
+///
+/// This data structure is immutable and consists of a milliseconds and 
nanos-of-millisecond since
+/// `1970-01-01 00:00:00`. It might be stored in a compact representation (as 
a long value) if
+/// values are small enough. Timestamp range from 0000-01-01 
00:00:00.000000000 to 9999-12-31
+/// 23:59:59.999999999.
+///
+class PAIMON_EXPORT Timestamp {
+ public:
+    Timestamp() : Timestamp(0, 0) {}
+    Timestamp(int64_t millisecond, int32_t nano_of_millisecond)
+        : millisecond_(millisecond), nano_of_millisecond_(nano_of_millisecond) 
{
+        assert(nano_of_millisecond >= 0 && nano_of_millisecond <= 999999ll);
+    }
+
+    /// Get the number of milliseconds since `1970-01-01 00:00:00`.
+    int64_t GetMillisecond() const {
+        return millisecond_;
+    }
+    /// Get the number of nanoseconds (the nanoseconds within the 
milliseconds).
+    ///
+    /// The value range is from 0 to 999,999.
+    int32_t GetNanoOfMillisecond() const {
+        return nano_of_millisecond_;
+    }
+
+    /// Creates an instance of `Timestamp` from milliseconds.
+    ///
+    /// The nanos-of-millisecond field will be set to zero.
+    ///
+    /// @param milliseconds The number of milliseconds since `1970-01-01 
00:00:00`; a
+    /// negative number is the number of milliseconds before `1970-01-01 
00:00:00`.
+    static Timestamp FromEpochMillis(int64_t milliseconds) {
+        return {milliseconds, 0};
+    }
+
+    /// Creates an instance of `Timestamp` from milliseconds and a 
nanos-of-millisecond.
+    ///
+    /// @param milliseconds The number of milliseconds since `1970-01-01 
00:00:00`; a
+    /// negative number is the number of milliseconds before `1970-01-01 
00:00:00`.
+    /// @param nanos_of_millisecond The nanoseconds within the millisecond, 
from 0 to 999,999.
+    static Timestamp FromEpochMillis(int64_t milliseconds, int32_t 
nanos_of_millisecond) {
+        return {milliseconds, nanos_of_millisecond};
+    }
+
+    /// Converts this `Timestamp` object to millis `Timestamp` object (ignore
+    /// `nanos_of_millisecond`).
+    Timestamp ToMillisTimestamp() const {
+        return FromEpochMillis(millisecond_);
+    }
+
+    /// Converts this `Timestamp` object to microsecond.
+    int64_t ToMicrosecond() const;
+
+    /// Converts this `Timestamp` object to nanoseconds.
+    int64_t ToNanosecond() const {
+        std::chrono::nanoseconds nanosecond = 
std::chrono::nanoseconds(nano_of_millisecond_) +
+                                              
std::chrono::milliseconds(millisecond_);
+        return nanosecond.count();
+    }
+
+    /// @return Whether the timestamp data is small enough to be stored in a 
long of
+    /// milliseconds.
+    static bool IsCompact(int32_t precision) {
+        return precision <= MAX_COMPACT_PRECISION;
+    }
+
+    bool operator==(const Timestamp& other) const {
+        if (this == &other) {
+            return true;
+        }
+        return millisecond_ == other.millisecond_ &&
+               nano_of_millisecond_ == other.nano_of_millisecond_;
+    }
+
+    bool operator!=(const Timestamp& other) const {
+        return !(*this == other);
+    }
+
+    bool operator<(const Timestamp& other) const {
+        if (millisecond_ == other.millisecond_) {
+            return nano_of_millisecond_ < other.nano_of_millisecond_;
+        }
+        return millisecond_ < other.millisecond_;
+    }
+
+    /// Converts the Timestamp object to a string representation in UTC (GMT).
+    ///
+    /// The format of the returned string is "YYYY-MM-DD HH:MM:SS.nnnnnnnnn",
+    /// where the date and time are in UTC (GMT), and the nanoseconds are 
derived
+    /// from the millisecond and nanosecond parts of the timestamp.
+    ///
+    /// @note This method uses UTC (GMT) time zone when formatting the time.
+    /// This is different from the Java Paimon implementation, which may 
convert
+    /// the timestamp to the local time zone of the machine running the Java 
process.
+    std::string ToString() const;
+
+    static const int32_t DEFAULT_PRECISION;
+    static const int32_t MILLIS_PRECISION;
+    static const int32_t MIN_PRECISION;
+    static const int32_t MAX_PRECISION;
+    static const int32_t MAX_COMPACT_PRECISION;
+
+ private:
+    int64_t millisecond_ = 0;
+    int32_t nano_of_millisecond_ = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/logging.h b/include/paimon/logging.h
new file mode 100644
index 0000000..0859d44
--- /dev/null
+++ b/include/paimon/logging.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+#define PAIMON_LOGGER_IMPL "logger_impl"
+
+enum PaimonLogLevel {
+    PAIMON_LOG_LEVEL_DEBUG = 0,
+    PAIMON_LOG_LEVEL_INFO = 1,
+    PAIMON_LOG_LEVEL_WARN = 2,
+    PAIMON_LOG_LEVEL_ERROR = 3,
+    PAIMON_LOG_LEVEL_NONE = 4,
+    PAIMON_LOG_LEVEL_MAX = 5
+};
+
+#define PAIMON_LOG_V(level, logger, format, ...)                               
              \
+    do {                                                                       
              \
+        if (logger->IsLevelEnabled(PAIMON_LOG_LEVEL_##level)) {                
              \
+            logger->LogV(PAIMON_LOG_LEVEL_##level, __FILE__, __LINE__, 
__FUNCTION__, format, \
+                         __VA_ARGS__);                                         
              \
+        }                                                                      
              \
+    } while (0)
+
+#define PAIMON_LOG_INFO(logger, fmt, ...) PAIMON_LOG_V(INFO, logger, fmt, 
__VA_ARGS__)
+#define PAIMON_LOG_ERROR(logger, fmt, ...) PAIMON_LOG_V(ERROR, logger, fmt, 
__VA_ARGS__)
+#define PAIMON_LOG_WARN(logger, fmt, ...) PAIMON_LOG_V(WARN, logger, fmt, 
__VA_ARGS__)
+#define PAIMON_LOG_DEBUG(logger, fmt, ...) PAIMON_LOG_V(DEBUG, logger, fmt, 
__VA_ARGS__)
+
+class PAIMON_EXPORT Logger {
+ public:
+    using LoggerCreator = std::function<std::unique_ptr<Logger>(const 
std::string&)>;
+
+    static void RegisterLogger(LoggerCreator creator);
+
+    static std::unique_ptr<Logger> GetLogger(const std::string& path);
+
+    virtual void LogV(PaimonLogLevel level, const char* fname, int lineno, 
const char* function,
+                      const char* fmt, ...) = 0;
+
+    virtual bool IsLevelEnabled(PaimonLogLevel level) const = 0;
+
+    virtual ~Logger() = default;
+
+ protected:
+    Logger() = default;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/blob_defs.h 
b/src/paimon/common/data/blob_defs.h
new file mode 100644
index 0000000..6a478e7
--- /dev/null
+++ b/src/paimon/common/data/blob_defs.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace paimon {
+
+/// Blob file format constants shared between writer and reader.
+///
+/// A Blob field uses the 'large_binary' type as its underlying physical 
storage in Apache Arrow
+/// Schema, and is marked as the Paimon Blob extension type by attaching 
specific
+/// **KeyValueMetadata**. Multiple blob fields in one paimon table are 
supported.
+class BlobDefs {
+ public:
+    BlobDefs() = delete;
+    ~BlobDefs() = delete;
+
+    /// To create a Blob field:
+    /// @code
+    ///   std::unordered_map<std::string, std::string> blob_metadata_map = {
+    ///       {Blob::kExtensionTypeKey, Blob::kExtensionTypeValue}
+    ///   };
+    ///   auto field = arrow::field("my_blob_field", arrow::large_binary(), 
false,
+    ///       std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map));
+    /// @endcode
+    /// Metadata key identifying a Paimon Blob extension type field.
+    static constexpr char kExtensionTypeKey[] = "paimon.extension.type";
+    /// Metadata value identifying a Paimon Blob extension type field.
+    static constexpr char kExtensionTypeValue[] = "paimon.type.blob";
+
+    /// A bin_length value of -1 in the index indicates a null blob entry.
+    static constexpr int64_t kNullBinLength = -1;
+    /// Blob file format version.
+    static constexpr int8_t kFileVersion = 1;
+    /// Magic number identifying the start of each blob bin.
+    static constexpr int32_t kMagicNumber = 1481511375;
+    /// Offset from the start of a bin to the actual blob content (magic 
number size).
+    static constexpr int32_t kContentStartOffset = 4;
+    /// Total metadata length per bin: magic(4) + bin_length(8) + crc32(4) = 
16.
+    static constexpr int32_t kTotalMetaLength = 16;
+    /// Blob file header length: index_len(4) + version(1) = 5.
+    static constexpr uint32_t kBlobFileHeaderLength = 5;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/blob_descriptor.cpp 
b/src/paimon/common/data/blob_descriptor.cpp
new file mode 100644
index 0000000..f5624c7
--- /dev/null
+++ b/src/paimon/common/data/blob_descriptor.cpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_descriptor.h"
+
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(const 
std::string& uri,
+                                                               int64_t offset, 
int64_t length) {
+    return Create(kCurrentVersion, uri, offset, length);
+}
+
+Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(int8_t version,
+                                                               const 
std::string& uri,
+                                                               int64_t offset, 
int64_t length) {
+    if (offset < 0) {
+        return Status::Invalid(fmt::format("offset {} is less than 0", 
offset));
+    }
+    // length == -1 means it's dynamic length
+    if (length < -1) {
+        return Status::Invalid(fmt::format("length {} is less than -1", 
length));
+    }
+    return std::unique_ptr<BlobDescriptor>(new BlobDescriptor(version, uri, 
offset, length));
+}
+
+PAIMON_UNIQUE_PTR<Bytes> BlobDescriptor::Serialize(const 
std::shared_ptr<MemoryPool>& pool) const {
+    MemorySegmentOutputStream 
out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
+    out.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+    out.WriteValue<int8_t>(version_);
+    out.WriteValue<int64_t>(kMagic);
+    out.WriteValue<int32_t>(static_cast<int32_t>(uri_.size()));
+
+    auto uri_bytes = std::make_shared<Bytes>(uri_, pool.get());
+    out.WriteBytes(uri_bytes);
+    out.WriteValue<int64_t>(offset_);
+    out.WriteValue<int64_t>(length_);
+    return MemorySegmentUtils::CopyToBytes(out.Segments(), 0, 
out.CurrentSize(), pool.get());
+}
+
+Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Deserialize(const 
char* buffer,
+                                                                    uint64_t 
size) {
+    auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, size);
+    DataInputStream in(std::move(input_stream));
+    in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+    PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
+    if (version > kCurrentVersion) {
+        return Status::Invalid(fmt::format(
+            "Expecting BlobDescriptor version to be less than or equal to {}, 
but found {}.",
+            kCurrentVersion, version));
+    }
+    if (version > 1) {
+        PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
+        if (kMagic != magic) {
+            return Status::Invalid(fmt::format(
+                "Invalid BlobDescriptor: missing magic header. Expected magic: 
{}, but found {}",
+                kMagic, magic));
+        }
+    }
+    PAIMON_ASSIGN_OR_RAISE(int32_t uri_length, in.ReadValue<int32_t>());
+    std::string uri(uri_length, '\0');
+    PAIMON_RETURN_NOT_OK(in.Read(uri.data(), uri.size()));
+    PAIMON_ASSIGN_OR_RAISE(int64_t offset, in.ReadValue<int64_t>());
+    PAIMON_ASSIGN_OR_RAISE(int64_t length, in.ReadValue<int64_t>());
+    return BlobDescriptor::Create(version, uri, offset, length);
+}
+
+Result<bool> BlobDescriptor::IsBlobDescriptor(const char* buffer, uint64_t 
size) {
+    if (size < kMinDescriptorLength) {
+        return false;
+    }
+    auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, size);
+    DataInputStream in(std::move(input_stream));
+    in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+    PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
+    if (version > kCurrentVersion) {
+        return false;
+    }
+    PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
+    return kMagic == magic;
+}
+
+std::string BlobDescriptor::ToString() const {
+    return fmt::format("BlobDescriptor{{version={}, uri='{}', offset={}, 
length={}}}", version_,
+                       uri_, offset_, length_);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/blob_descriptor.h 
b/src/paimon/common/data/blob_descriptor.h
new file mode 100644
index 0000000..6664b85
--- /dev/null
+++ b/src/paimon/common/data/blob_descriptor.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+
+namespace paimon {
+/// Blob descriptor to describe a blob reference.
+/// Memory Layout Description: All multi-byte numerical values (int/long) are 
stored using Little
+/// Endian byte order.
+///
+/// | Offset | Field Name    | Type      | Size |
+/// |--------|---------------|-----------|------|
+/// | 0      | version       | byte      | 1    |
+/// | 1      | magic_number  | long      | 8    |
+/// | 9      | uri_length    | int       | 4    |
+/// | 13     | uri_bytes     | byte[N]   | N    |
+/// | 13 + N | offset        | long      | 8    |
+/// | 21 + N | length        | long      | 8    |
+
+class BlobDescriptor {
+ public:
+    static Result<std::unique_ptr<BlobDescriptor>> Create(const std::string& 
uri, int64_t offset,
+                                                          int64_t length);
+
+    static Result<std::unique_ptr<BlobDescriptor>> Create(int8_t version, 
const std::string& uri,
+                                                          int64_t offset, 
int64_t length);
+
+    static Result<std::unique_ptr<BlobDescriptor>> Deserialize(const char* 
buffer, uint64_t size);
+
+    static Result<bool> IsBlobDescriptor(const char* buffer, uint64_t size);
+
+    PAIMON_UNIQUE_PTR<Bytes> Serialize(const std::shared_ptr<MemoryPool>& 
pool) const;
+
+    std::string ToString() const;
+
+    int8_t Version() const {
+        return version_;
+    }
+
+    const std::string& Uri() const {
+        return uri_;
+    }
+
+    int64_t Offset() const {
+        return offset_;
+    }
+
+    int64_t Length() const {
+        return length_;
+    }
+
+ private:
+    BlobDescriptor(int8_t version, const std::string& uri, int64_t offset, 
int64_t length)
+        : version_(version), uri_(uri), offset_(offset), length_(length) {}
+
+ private:
+    static constexpr int64_t kMagic = 0x424C4F4244455343l;
+    /// one byte for version, eight bytes for magic number.
+    static constexpr uint64_t kMinDescriptorLength = 9;
+    static constexpr int8_t kCurrentVersion = 2;
+
+    const int8_t version_ = kCurrentVersion;
+    std::string uri_;
+    int64_t offset_ = 0;
+    int64_t length_ = -1;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/blob_descriptor_test.cpp 
b/src/paimon/common/data/blob_descriptor_test.cpp
new file mode 100644
index 0000000..a5221bf
--- /dev/null
+++ b/src/paimon/common/data/blob_descriptor_test.cpp
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_descriptor.h"
+
+#include <memory>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BlobDescriptorTest : public testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        ASSERT_OK_AND_ASSIGN(descriptor_,
+                             BlobDescriptor::Create("test_uri", 
/*offset=*/1024, /*length=*/2048));
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::unique_ptr<BlobDescriptor> descriptor_;
+};
+
+TEST_F(BlobDescriptorTest, TestConstructorAndGetters) {
+    ASSERT_EQ(descriptor_->Uri(), "test_uri");
+    ASSERT_EQ(descriptor_->Offset(), 1024);
+    ASSERT_EQ(descriptor_->Length(), 2048);
+}
+
+TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion1) {
+    std::vector<char> bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 
114, 105, 0, 4,
+                               0, 0, 0, 0, 0, 0,   0,   8,   0,   0,  0,   0,  
 0,   0};
+    auto java_serialized = std::string(bytes.data(), bytes.size());
+
+    ASSERT_OK_AND_ASSIGN(auto descriptor, 
BlobDescriptor::Deserialize(java_serialized.data(),
+                                                                      
java_serialized.size()));
+    ASSERT_EQ(descriptor->Version(), (int8_t)1);
+    ASSERT_EQ(descriptor->Uri(), "test_uri");
+    ASSERT_EQ(descriptor->Offset(), 1024);
+    ASSERT_EQ(descriptor->Length(), 2048);
+}
+
+TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion2) {
+    std::vector<char> bytes = {2,   67,  83,  69,  68, 66,  79,  76,  66, 8, 
0, 0, 0,
+                               116, 101, 115, 116, 95, 117, 114, 105, 0,  4, 
0, 0, 0,
+                               0,   0,   0,   0,   8,  0,   0,   0,   0,  0, 
0};
+    auto java_serialized = std::string(bytes.data(), bytes.size());
+
+    ASSERT_OK_AND_ASSIGN(auto descriptor, 
BlobDescriptor::Deserialize(java_serialized.data(),
+                                                                      
java_serialized.size()));
+    ASSERT_EQ(descriptor->Version(), (int8_t)2);
+    ASSERT_EQ(descriptor->Uri(), "test_uri");
+    ASSERT_EQ(descriptor->Offset(), 1024);
+    ASSERT_EQ(descriptor->Length(), 2048);
+
+    PAIMON_UNIQUE_PTR<Bytes> cpp_serialized = descriptor->Serialize(pool_);
+    auto cpp_serialized_string = std::string(cpp_serialized->data(), 
cpp_serialized->size());
+    ASSERT_EQ(cpp_serialized_string, java_serialized);
+}
+
+TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithEmptyUri) {
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> empty_uri_descriptor,
+                         BlobDescriptor::Create(/*uri=*/"", /*offset=*/0, 
/*length=*/0));
+    auto serialized = empty_uri_descriptor->Serialize(pool_);
+    ASSERT_OK_AND_ASSIGN(auto restored_descriptor,
+                         BlobDescriptor::Deserialize(serialized->data(), 
serialized->size()));
+
+    ASSERT_EQ(restored_descriptor->Uri(), "");
+    ASSERT_EQ(restored_descriptor->Offset(), 0);
+    ASSERT_EQ(restored_descriptor->Length(), 0);
+}
+
+TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithDynamicLength) {
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> empty_uri_descriptor,
+                         BlobDescriptor::Create("test_uri", /*offset=*/0, 
/*length=*/-1));
+    auto serialized = empty_uri_descriptor->Serialize(pool_);
+    ASSERT_OK_AND_ASSIGN(auto restored_descriptor,
+                         BlobDescriptor::Deserialize(serialized->data(), 
serialized->size()));
+
+    ASSERT_EQ(restored_descriptor->Uri(), "test_uri");
+    ASSERT_EQ(restored_descriptor->Offset(), 0);
+    ASSERT_EQ(restored_descriptor->Length(), -1);
+}
+
+TEST_F(BlobDescriptorTest, TestInvalidParameters) {
+    // Test deserialize invalid version
+    {
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> descriptor,
+                             BlobDescriptor::Create(/*uri=*/"test", 
/*offset=*/1, /*length=*/2));
+        auto serialized = descriptor->Serialize(pool_);
+        (*serialized)[0] = '\x03';
+        ASSERT_NOK_WITH_MSG(
+            BlobDescriptor::Deserialize(serialized->data(), 
serialized->size()),
+            "Expecting BlobDescriptor version to be less than or equal to 2, 
but found 3");
+    }
+    // Test deserialize invalid buffer size
+    {
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> descriptor,
+                             BlobDescriptor::Create(/*uri=*/"test", 
/*offset=*/1, /*length=*/2));
+        auto serialized = descriptor->Serialize(pool_);
+        ASSERT_NOK(BlobDescriptor::Deserialize(serialized->data(), 
/*size=*/5));
+    }
+    // Test invalid offset
+    {
+        ASSERT_NOK_WITH_MSG(BlobDescriptor::Create(/*uri=*/"test", 
/*offset=*/-1, /*length=*/2),
+                            "offset -1 is less than 0");
+    }
+    // Test invalid length
+    {
+        ASSERT_NOK_WITH_MSG(BlobDescriptor::Create(/*uri=*/"test", 
/*offset=*/1, /*length=*/-2),
+                            "length -2 is less than -1");
+    }
+}
+
+TEST_F(BlobDescriptorTest, TestToString) {
+    std::string debug_str = descriptor_->ToString();
+    ASSERT_FALSE(debug_str.empty());
+    ASSERT_TRUE(debug_str.find("version=2") != std::string::npos);
+    ASSERT_TRUE(debug_str.find("uri='test_uri'") != std::string::npos);
+    ASSERT_TRUE(debug_str.find("offset=1024") != std::string::npos);
+    ASSERT_TRUE(debug_str.find("length=2048") != std::string::npos);
+}
+
+TEST_F(BlobDescriptorTest, TestRoundTripConsistency) {
+    auto first_serialized = descriptor_->Serialize(pool_);
+    ASSERT_OK_AND_ASSIGN(
+        auto first_restored,
+        BlobDescriptor::Deserialize(first_serialized->data(), 
first_serialized->size()));
+    auto second_serialized = first_restored->Serialize(pool_);
+    ASSERT_EQ(*first_serialized, *second_serialized);
+
+    ASSERT_OK_AND_ASSIGN(
+        auto second_restored,
+        BlobDescriptor::Deserialize(second_serialized->data(), 
second_serialized->size()));
+    ASSERT_EQ(second_restored->Uri(), "test_uri");
+    ASSERT_EQ(second_restored->Offset(), 1024);
+    ASSERT_EQ(second_restored->Length(), 2048);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithValidDescriptor) {
+    // A valid v2 descriptor should be recognized
+    auto serialized = descriptor_->Serialize(pool_);
+    ASSERT_OK_AND_ASSIGN(bool result,
+                         BlobDescriptor::IsBlobDescriptor(serialized->data(), 
serialized->size()));
+    ASSERT_TRUE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithTooShortBuffer) {
+    // Buffer shorter than 9 bytes should return false
+    std::vector<char> short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 
0x4F, 0x4C};
+    ASSERT_OK_AND_ASSIGN(
+        bool result, BlobDescriptor::IsBlobDescriptor(short_buffer.data(), 
short_buffer.size()));
+    ASSERT_FALSE(result);
+
+    // Empty buffer
+    ASSERT_OK_AND_ASSIGN(bool empty_result, 
BlobDescriptor::IsBlobDescriptor(nullptr, 0));
+    ASSERT_FALSE(empty_result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithFutureVersion) {
+    // Version > CURRENT_VERSION should return false (not an error)
+    auto serialized = descriptor_->Serialize(pool_);
+    (*serialized)[0] = '\x03';  // set version to 3 (> CURRENT_VERSION)
+    ASSERT_OK_AND_ASSIGN(bool result,
+                         BlobDescriptor::IsBlobDescriptor(serialized->data(), 
serialized->size()));
+    ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithWrongMagic) {
+    // Wrong magic number should return false
+    auto serialized = descriptor_->Serialize(pool_);
+    // Corrupt the magic bytes (bytes 1-8)
+    (*serialized)[1] = '\x00';
+    (*serialized)[2] = '\x00';
+    ASSERT_OK_AND_ASSIGN(bool result,
+                         BlobDescriptor::IsBlobDescriptor(serialized->data(), 
serialized->size()));
+    ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithRandomData) {
+    // Random data that doesn't match blob descriptor format
+    std::vector<char> random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 
0x07, 0x08, 0x09};
+    ASSERT_OK_AND_ASSIGN(bool result,
+                         BlobDescriptor::IsBlobDescriptor(random_data.data(), 
random_data.size()));
+    ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithVersion1Data) {
+    // v1 data: version=1, followed by uri_length (not magic), should return 
false
+    // because reading bytes 1-8 as magic won't match MAGIC constant
+    std::vector<char> v1_bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 
114, 105, 0, 4,
+                                  0, 0, 0, 0, 0, 0,   0,   8,   0,   0,  0,   
0,   0,   0};
+    ASSERT_OK_AND_ASSIGN(bool result,
+                         BlobDescriptor::IsBlobDescriptor(v1_bytes.data(), 
v1_bytes.size()));
+    ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithExactly9Bytes) {
+    // Exactly 9 bytes with valid version and magic should return true
+    // version=2, magic=0x424C4F4244455343 in little-endian
+    std::vector<char> minimal = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 
0x4C, 0x42};
+    ASSERT_OK_AND_ASSIGN(bool result,
+                         BlobDescriptor::IsBlobDescriptor(minimal.data(), 
minimal.size()));
+    ASSERT_TRUE(result);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/blob_test.cpp 
b/src/paimon/common/data/blob_test.cpp
new file mode 100644
index 0000000..4c95711
--- /dev/null
+++ b/src/paimon/common/data/blob_test.cpp
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/blob.h"
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "gtest/gtest.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BlobTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        dir_ = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        uri_ = dir_->Str() + "/file.blob";
+        file_system_ = std::make_shared<LocalFileSystem>();
+        ASSERT_OK(file_system_->WriteFile(uri_, "abcdefghijklmn", true));
+    }
+    void TearDown() override {}
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::unique_ptr<UniqueTestDirectory> dir_;
+    std::shared_ptr<FileSystem> file_system_;
+    std::string uri_;
+};
+
+TEST_F(BlobTest, TestSimple) {
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob, Blob::FromPath(uri_));
+    auto serialized = blob->ToDescriptor(pool_);
+
+    ASSERT_OK_AND_ASSIGN(auto restored_blob,
+                         Blob::FromDescriptor(serialized->data(), 
serialized->size()));
+    ASSERT_EQ(*restored_blob->ToDescriptor(pool_), *serialized);
+    ASSERT_OK_AND_ASSIGN(auto input_stream, 
restored_blob->NewInputStream(file_system_));
+    ASSERT_EQ(uri_, restored_blob->Uri());
+    std::string str(14, '\0');
+    ASSERT_OK(input_stream->Read(str.data(), str.size()));
+    ASSERT_EQ("abcdefghijklmn", str);
+}
+
+TEST_F(BlobTest, TestInvalidParameters) {
+    // Test null file system in NewInputStream
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob, Blob::FromPath(uri_));
+    ASSERT_NOK_WITH_MSG(blob->NewInputStream(nullptr), "file system is 
nullptr");
+
+    // Test invalid descriptor
+    std::string invalid_bytes = "invalid_descriptor_bytes";
+    ASSERT_NOK(Blob::FromDescriptor(invalid_bytes.data(), 
invalid_bytes.size()));
+}
+
+TEST_F(BlobTest, TestRoundTripConsistency) {
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> original_blob,
+                         Blob::FromPath(uri_, /*offset=*/1, /*length=*/5));
+
+    auto first_descriptor = original_blob->ToDescriptor(pool_);
+    ASSERT_OK_AND_ASSIGN(auto restored_blob,
+                         Blob::FromDescriptor(first_descriptor->data(), 
first_descriptor->size()));
+
+    auto second_descriptor = restored_blob->ToDescriptor(pool_);
+    ASSERT_EQ(*first_descriptor, *second_descriptor);
+
+    // Verify scheme consistency
+    ASSERT_EQ(original_blob->Uri(), restored_blob->Uri());
+}
+
+TEST_F(BlobTest, TestInputStreamCreation) {
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob, Blob::FromPath(uri_));
+    ASSERT_OK_AND_ASSIGN(auto input_stream, 
blob->NewInputStream(file_system_));
+    ASSERT_TRUE(input_stream);
+
+    // Test reading from the input stream
+    std::string buffer(14, '\0');
+    ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(), 
buffer.size()));
+    ASSERT_EQ(14, bytes_read);
+    ASSERT_EQ("abcdefghijklmn", buffer);
+}
+
+TEST_F(BlobTest, TestBoundaryConditions) {
+    // Test with empty path
+    ASSERT_NOK_WITH_MSG(Blob::FromPath(""), "path is an empty string");
+
+    // Test with very large offset and length values
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob,
+                         Blob::FromPath(uri_, /*offset=*/INT64_MAX, 
/*length=*/1));
+    auto serialized = blob->ToDescriptor(pool_);
+
+    ASSERT_OK_AND_ASSIGN(auto restored_blob,
+                         Blob::FromDescriptor(serialized->data(), 
serialized->size()));
+    ASSERT_EQ(*restored_blob->ToDescriptor(pool_), *serialized);
+}
+
+TEST_F(BlobTest, TestNewInputStreamWithOffsetAndLength) {
+    // Create blob with offset=2, length=6 (should read "cdefgh")
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob,
+                         Blob::FromPath(uri_, /*offset=*/2, /*length=*/6));
+    ASSERT_OK_AND_ASSIGN(auto input_stream, 
blob->NewInputStream(file_system_));
+    ASSERT_TRUE(input_stream);
+
+    ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
+    ASSERT_EQ(6, length);
+
+    // Test reading with offset and length applied
+    std::string buffer(6, '\0');
+    ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(), 
buffer.size()));
+    ASSERT_EQ(6, bytes_read);
+    ASSERT_EQ("cdefgh", buffer);
+}
+
+TEST_F(BlobTest, TestNewInputStreamWithDynamicLength) {
+    // Create blob with offset=2, dynamic length (-1 means read to end)
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob,
+                         Blob::FromPath(uri_, /*offset=*/2, /*length=*/-1));
+    ASSERT_OK_AND_ASSIGN(auto input_stream, 
blob->NewInputStream(file_system_));
+    ASSERT_TRUE(input_stream);
+
+    ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
+    ASSERT_EQ(12, length);
+
+    // Test reading from offset to end (should read "cdefghijklmn")
+    std::string buffer(12, '\0');
+    ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(), 
buffer.size()));
+    ASSERT_EQ(12, bytes_read);
+    ASSERT_EQ("cdefghijklmn", buffer);
+}
+
+TEST_F(BlobTest, TestArrowField) {
+    {
+        // basic: field name, non-nullable by default
+        ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("my_blob"));
+        ASSERT_NE(schema, nullptr);
+
+        // import back to arrow::Field to verify
+        auto field_result = arrow::ImportField(schema.get());
+        ASSERT_TRUE(field_result.ok());
+        auto field = field_result.ValueUnsafe();
+
+        ASSERT_EQ(field->name(), "my_blob");
+        ASSERT_EQ(field->type()->id(), arrow::Type::LARGE_BINARY);
+        ASSERT_FALSE(field->nullable());
+        ASSERT_TRUE(field->HasMetadata());
+        auto extension_type = field->metadata()->Get("paimon.extension.type");
+        ASSERT_TRUE(extension_type.ok());
+        ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob");
+    }
+    {
+        // with custom metadata
+        std::unordered_map<std::string, std::string> custom_metadata = {
+            {"custom_key", "custom_value"}};
+        ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("meta_blob", 
custom_metadata));
+        auto field = arrow::ImportField(schema.get()).ValueUnsafe();
+        ASSERT_EQ(field->name(), "meta_blob");
+        ASSERT_FALSE(field->nullable());
+        ASSERT_TRUE(field->HasMetadata());
+        // blob extension metadata should be present
+        auto extension_type = field->metadata()->Get("paimon.extension.type");
+        ASSERT_TRUE(extension_type.ok());
+        ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob");
+        // custom metadata should also be present
+        auto custom_val = field->metadata()->Get("custom_key");
+        ASSERT_TRUE(custom_val.ok());
+        ASSERT_EQ(custom_val.ValueUnsafe(), "custom_value");
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/blob_utils.cpp 
b/src/paimon/common/data/blob_utils.cpp
new file mode 100644
index 0000000..9eb716a
--- /dev/null
+++ b/src/paimon/common/data/blob_utils.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_utils.h"
+
+#include <cstddef>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/type.h"
+#include "paimon/common/data/blob_defs.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/string_utils.h"
+
+namespace arrow {
+class Array;
+}
+
+namespace paimon {
+
+BlobUtils::SeparatedSchemas BlobUtils::SeparateBlobSchema(
+    const std::shared_ptr<arrow::Schema>& schema) {
+    std::vector<std::shared_ptr<arrow::Field>> remaining_fields;
+    std::vector<std::shared_ptr<arrow::Field>> blob_fields;
+    for (auto i = 0; i < schema->num_fields(); i++) {
+        auto field = schema->field(i);
+        if (IsBlobField(field)) {
+            blob_fields.emplace_back(field);
+        } else {
+            remaining_fields.emplace_back(field);
+        }
+    }
+    SeparatedSchemas result;
+    result.main_schema = arrow::schema(remaining_fields);
+    result.blob_schema = arrow::schema(blob_fields);
+    return result;
+}
+
+Result<BlobUtils::SeparatedStructArrays> BlobUtils::SeparateBlobArray(
+    const std::shared_ptr<arrow::StructArray>& struct_array) {
+    std::shared_ptr<arrow::StructType> old_type =
+        std::static_pointer_cast<arrow::StructType>(struct_array->type());
+    const auto& old_fields = old_type->fields();
+    const auto& old_arrays = struct_array->fields();
+
+    std::vector<std::shared_ptr<arrow::Field>> remaining_fields;
+    std::vector<std::shared_ptr<arrow::Array>> remaining_arrays;
+    std::vector<std::shared_ptr<arrow::Field>> blob_fields;
+    std::vector<std::shared_ptr<arrow::Array>> blob_arrays;
+
+    for (size_t i = 0; i < old_fields.size(); i++) {
+        if (IsBlobField(old_fields[i])) {
+            blob_fields.push_back(old_fields[i]);
+            blob_arrays.push_back(old_arrays[i]);
+        } else {
+            remaining_fields.push_back(old_fields[i]);
+            remaining_arrays.push_back(old_arrays[i]);
+        }
+    }
+
+    SeparatedStructArrays result;
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.main_array,
+                                      
arrow::StructArray::Make(remaining_arrays, remaining_fields));
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.blob_array,
+                                      arrow::StructArray::Make(blob_arrays, 
blob_fields));
+    return result;
+}
+
+bool BlobUtils::IsBlobField(const std::shared_ptr<arrow::Field>& field) {
+    const auto& type = field->type();
+    if (type->id() != arrow::Type::LARGE_BINARY) {
+        return false;
+    }
+    if (!field->HasMetadata()) {
+        return false;
+    }
+    return IsBlobMetadata(field->metadata());
+}
+
+bool BlobUtils::IsBlobMetadata(const std::shared_ptr<const 
arrow::KeyValueMetadata>& metadata) {
+    if (!metadata) {
+        return false;
+    }
+    auto extension_name = metadata->Get(BlobDefs::kExtensionTypeKey);
+    if (!extension_name.ok()) {
+        return false;
+    }
+    return extension_name.ValueUnsafe() == BlobDefs::kExtensionTypeValue;
+}
+
+bool BlobUtils::IsBlobFile(const std::string& file_name) {
+    return StringUtils::EndsWith(file_name, ".blob");
+}
+
+std::shared_ptr<arrow::Field> BlobUtils::ToArrowField(
+    const std::string& field_name, bool nullable,
+    std::unordered_map<std::string, std::string> metadata) {
+    metadata[BlobDefs::kExtensionTypeKey] = BlobDefs::kExtensionTypeValue;
+    return arrow::field(field_name, arrow::large_binary(), nullable,
+                        std::make_shared<arrow::KeyValueMetadata>(metadata));
+}
+}  // namespace paimon
diff --git a/src/paimon/common/data/blob_utils.h 
b/src/paimon/common/data/blob_utils.h
new file mode 100644
index 0000000..505cad5
--- /dev/null
+++ b/src/paimon/common/data/blob_utils.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace arrow {
+class Field;
+class KeyValueMetadata;
+class Schema;
+class StructArray;
+}  // namespace arrow
+
+namespace paimon {
+/// Utils for blob type.
+class PAIMON_EXPORT BlobUtils {
+ public:
+    BlobUtils() = delete;
+    ~BlobUtils() = delete;
+
+    struct SeparatedSchemas {
+        /// Non-blob fields
+        std::shared_ptr<arrow::Schema> main_schema;
+        /// Blob fields only
+        std::shared_ptr<arrow::Schema> blob_schema;
+    };
+
+    struct SeparatedStructArrays {
+        /// Non-blob fields
+        std::shared_ptr<arrow::StructArray> main_array;
+        /// Blob fields only
+        std::shared_ptr<arrow::StructArray> blob_array;
+    };
+
+    static SeparatedSchemas SeparateBlobSchema(const 
std::shared_ptr<arrow::Schema>& schema);
+
+    static Result<SeparatedStructArrays> SeparateBlobArray(
+        const std::shared_ptr<arrow::StructArray>& struct_array);
+
+    static bool IsBlobField(const std::shared_ptr<arrow::Field>& field);
+    static bool IsBlobMetadata(const std::shared_ptr<const 
arrow::KeyValueMetadata>& metadata);
+    static bool IsBlobFile(const std::string& file_name);
+
+    static std::shared_ptr<arrow::Field> ToArrowField(
+        const std::string& field_name, bool nullable = false,
+        std::unordered_map<std::string, std::string> metadata = {});
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/blob_utils_test.cpp 
b/src/paimon/common/data/blob_utils_test.cpp
new file mode 100644
index 0000000..d042ecd
--- /dev/null
+++ b/src/paimon/common/data/blob_utils_test.cpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_utils.h"
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/blob_defs.h"
+#include "paimon/data/blob.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BlobUtilsTest : public ::testing::Test {
+ private:
+    std::shared_ptr<arrow::KeyValueMetadata> CreateBlobMetadata() {
+        std::unordered_map<std::string, std::string> blob_metadata_map = {
+            {BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}};
+        return std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map);
+    }
+};
+
+TEST_F(BlobUtilsTest, IsBlobMetadata) {
+    auto correct_metadata = CreateBlobMetadata();
+    EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
+    EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
+    std::unordered_map<std::string, std::string> wrong_metadata_map = {
+        {BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}};
+    auto wrong_metadata = 
std::make_shared<arrow::KeyValueMetadata>(wrong_metadata_map);
+    EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
+    std::unordered_map<std::string, std::string> no_extension_metadata_map = {
+        {"other_key", BlobDefs::kExtensionTypeValue}};
+    auto no_extension_metadata =
+        std::make_shared<arrow::KeyValueMetadata>(no_extension_metadata_map);
+    EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
+}
+
+TEST_F(BlobUtilsTest, IsBlobField) {
+    std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("f1", 
true);
+    EXPECT_TRUE(BlobUtils::IsBlobField(blob_field));
+
+    auto int_field = arrow::field("i_int", arrow::int32());
+    EXPECT_FALSE(BlobUtils::IsBlobField(int_field));
+
+    auto binary_field_no_meta = arrow::field("b_no_meta", 
arrow::large_binary());
+    EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta));
+
+    auto wrong_meta = std::make_shared<arrow::KeyValueMetadata>(
+        std::unordered_map<std::string, std::string>{{"other_key", "value"}});
+    auto binary_field_wrong_meta =
+        arrow::field("b_wrong_meta", arrow::large_binary(), false, wrong_meta);
+    EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta));
+}
+
+TEST_F(BlobUtilsTest, SeparateBlobSchema) {
+    auto int_field = arrow::field("f1_int", arrow::int32());
+    auto string_field = arrow::field("f2_string", arrow::utf8());
+    std::shared_ptr<arrow::Field> blob_field_1 = 
BlobUtils::ToArrowField("f3_blob_1", true);
+    {
+        std::shared_ptr<arrow::Schema> original_schema =
+            arrow::schema({int_field, string_field, blob_field_1});
+
+        BlobUtils::SeparatedSchemas schemas = 
BlobUtils::SeparateBlobSchema(original_schema);
+
+        std::shared_ptr<arrow::Schema> expected_main_schema =
+            arrow::schema({int_field, string_field});
+        ASSERT_TRUE(schemas.main_schema->Equals(*expected_main_schema));
+
+        std::shared_ptr<arrow::Schema> expected_blob_schema = 
arrow::schema({blob_field_1});
+        ASSERT_TRUE(schemas.blob_schema->Equals(*expected_blob_schema));
+    }
+    {
+        std::shared_ptr<arrow::Schema> no_blob_schema = 
arrow::schema({int_field, string_field});
+        BlobUtils::SeparatedSchemas no_blob_schemas = 
BlobUtils::SeparateBlobSchema(no_blob_schema);
+        ASSERT_TRUE(no_blob_schemas.main_schema->Equals(*no_blob_schema));
+        ASSERT_EQ(no_blob_schemas.blob_schema->num_fields(), 0);
+    }
+    {
+        std::shared_ptr<arrow::Schema> only_blob_schema = 
arrow::schema({blob_field_1});
+        BlobUtils::SeparatedSchemas only_blob_schemas =
+            BlobUtils::SeparateBlobSchema(only_blob_schema);
+        ASSERT_TRUE(only_blob_schemas.blob_schema->Equals(*only_blob_schema));
+        ASSERT_EQ(only_blob_schemas.main_schema->num_fields(), 0);
+    }
+}
+
+TEST_F(BlobUtilsTest, SeparateBlobArray) {
+    auto int_field = arrow::field("f1_int", arrow::int32());
+    std::shared_ptr<arrow::Field> blob_field = 
BlobUtils::ToArrowField("f2_blob", false);
+    auto string_field = arrow::field("f3_string", arrow::utf8());
+    auto schema = arrow::schema({int_field, blob_field, string_field});
+
+    arrow::Int32Builder int_builder;
+    ASSERT_TRUE(int_builder.AppendValues({1, 2, 3}).ok());
+    auto int_array = int_builder.Finish().ValueOrDie();
+
+    arrow::StringBuilder string_builder;
+    ASSERT_TRUE(string_builder.AppendValues({"a", "b", "c"}).ok());
+    auto string_array = string_builder.Finish().ValueOrDie();
+
+    arrow::LargeBinaryBuilder blob_builder;
+    ASSERT_TRUE(blob_builder.Append("1", 1).ok());
+    ASSERT_TRUE(blob_builder.Append("2", 1).ok());
+    ASSERT_TRUE(blob_builder.Append("3", 1).ok());
+    auto blob_array_data = blob_builder.Finish().ValueOrDie();
+
+    auto raw_struct_array =
+        arrow::StructArray::Make({int_array, blob_array_data, string_array}, 
schema->fields())
+            .ValueOrDie();
+
+    std::shared_ptr<arrow::StructArray> struct_array =
+        std::static_pointer_cast<arrow::StructArray>(raw_struct_array);
+
+    ASSERT_OK_AND_ASSIGN(auto separated, 
BlobUtils::SeparateBlobArray(struct_array));
+
+    std::shared_ptr<arrow::DataType> expected_main_type = 
arrow::struct_({int_field, string_field});
+    ASSERT_TRUE(separated.main_array->type()->Equals(*expected_main_type));
+    ASSERT_EQ(separated.main_array->num_fields(), 2);
+    ASSERT_TRUE(separated.main_array->field(0)->Equals(*int_array));
+    ASSERT_TRUE(separated.main_array->field(1)->Equals(*string_array));
+
+    std::shared_ptr<arrow::DataType> expected_blob_type = 
arrow::struct_({blob_field});
+    ASSERT_TRUE(separated.blob_array->type()->Equals(*expected_blob_type));
+    ASSERT_EQ(separated.blob_array->num_fields(), 1);
+    ASSERT_TRUE(separated.blob_array->field(0)->Equals(*blob_array_data));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/decimal.cpp 
b/src/paimon/common/data/decimal.cpp
new file mode 100644
index 0000000..c05982c
--- /dev/null
+++ b/src/paimon/common/data/decimal.cpp
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/decimal.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstdlib>
+
+#include "arrow/api.h"
+#include "arrow/scalar.h"
+#include "arrow/util/decimal.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+const int64_t Decimal::POWERS_OF_TEN[MAX_COMPACT_PRECISION + 1] = {1,
+                                                                   10,
+                                                                   100,
+                                                                   1000,
+                                                                   10000,
+                                                                   100000,
+                                                                   1000000l,
+                                                                   10000000l,
+                                                                   100000000l,
+                                                                   1000000000l,
+                                                                   
10000000000l,
+                                                                   
100000000000l,
+                                                                   
1000000000000l,
+                                                                   
10000000000000l,
+                                                                   
100000000000000l,
+                                                                   
1000000000000000l,
+                                                                   
10000000000000000l,
+                                                                   
100000000000000000l,
+                                                                   
1000000000000000000l};
+const Decimal::int128_t Decimal::INT128_MAXIMUM_VALUE =
+    static_cast<Decimal::int128_t>(0x7fffffffffffffff) << 64 | 
0xffffffffffffffff;
+const Decimal::int128_t Decimal::INT128_MINIMUM_VALUE =
+    static_cast<Decimal::int128_t>(0x8000000000000000) << 64;
+
+std::string Decimal::ToString() const {
+    auto type = arrow::decimal128(Precision(), Scale());
+    arrow::Decimal128Scalar scalar(arrow::Decimal128(HighBits(), LowBits()), 
type);
+    return scalar.ToString();
+}
+
+std::vector<char> Decimal::ToUnscaledBytes() const {
+    bool positive = value_ >= 0;
+    int32_t valid_bytes = 0;
+    if (positive) {
+        int32_t leading_zero_bytes = count_leading_zero_bytes(value_);
+        valid_bytes = sizeof(value_) - leading_zero_bytes;
+    } else {
+        int32_t leading_all_ones_bytes = count_leading_all_ones_bytes(value_);
+        valid_bytes = sizeof(value_) - leading_all_ones_bytes;
+    }
+    if (valid_bytes == 0) {
+        // if value_ == 0, return one byte with 0;
+        // if value_ == 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, return one byte 
with 0xFF
+        valid_bytes = 1;
+    } else {
+        // java BigInteger use highest significant bit to determine if the 
number is positive or
+        // negative, e.g., a positive BigInteger 0xFF, will return two bytes 
[0x00, 0xFF]
+        bool highest_significant_bit =
+            (static_cast<Decimal::uint128_t>(value_) >> ((valid_bytes - 1) * 
8)) & 0x80;
+        if ((positive && highest_significant_bit) || (!positive && 
!highest_significant_bit)) {
+            valid_bytes += 1;
+        }
+    }
+    std::vector<char> bytes(valid_bytes);
+    memcpy(bytes.data(), &value_, valid_bytes);
+    if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+        std::reverse(bytes.data(), bytes.data() + bytes.size());
+    }
+    return bytes;
+}
+
+Decimal Decimal::FromUnscaledBytes(int32_t precision, int32_t scale, Bytes* 
bytes) {
+    if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+        std::reverse(bytes->data(), bytes->data() + bytes->size());
+    }
+    Decimal::int128_t value = 0;
+    for (size_t i = 0; i < bytes->size(); ++i) {
+        value |= 
static_cast<Decimal::uint128_t>(static_cast<uint8_t>((*bytes)[i])) << (8 * i);
+    }
+    // for negative
+    if ((*bytes)[bytes->size() - 1] & 0x80) {
+        for (size_t i = bytes->size(); i < sizeof(Decimal::int128_t); ++i) {
+            value |= static_cast<Decimal::uint128_t>(0xFFull) << (8 * i);
+        }
+    }
+    return Decimal(precision, scale, value);
+}
+
+int32_t Decimal::clz_u128(uint128_t u) {
+    uint64_t hi = u >> 64;
+    uint64_t lo = u;
+    int32_t retval[3] = {__builtin_clzll(hi), __builtin_clzll(lo) + 64, 128};
+    int32_t idx = !hi + ((!lo) & (!hi));
+    return retval[idx];
+}
+
+int32_t Decimal::count_leading_zero_bytes(uint128_t u) {
+    if (u == 0) {
+        return sizeof(u);
+    }
+    int32_t leading_zeros = clz_u128(u);
+    return leading_zeros / 8;
+}
+
+int32_t Decimal::count_leading_all_ones_bytes(uint128_t u) {
+    if (u == 0) {
+        return 0;
+    }
+    int32_t count = 0;
+    for (int32_t i = sizeof(uint128_t) - 1; i >= 0; i--) {
+        if (((u >> (i * 8)) & 0xFF) == 0xFF) {
+            count++;
+        } else {
+            break;
+        }
+    }
+    return count;
+}
+
+Decimal::int128_t Decimal::DownScaleInt128(Decimal::int128_t value, int32_t 
scale) {
+    while (scale > 0) {
+        int32_t step = std::min(std::abs(scale), MAX_COMPACT_PRECISION);
+        value /= POWERS_OF_TEN[step];
+        scale -= step;
+    }
+    return value;
+}
+
+Decimal::int128_t Decimal::ScaleInt128(Decimal::int128_t value, int32_t scale, 
bool* overflow) {
+    *overflow = false;
+    while (scale > 0) {
+        int32_t step = std::min(scale, MAX_COMPACT_PRECISION);
+        if (value > 0 && INT128_MAXIMUM_VALUE / POWERS_OF_TEN[step] < value) {
+            *overflow = true;
+            return INT128_MAXIMUM_VALUE;
+        } else if (value < 0 && INT128_MINIMUM_VALUE / POWERS_OF_TEN[step] > 
value) {
+            *overflow = true;
+            return INT128_MINIMUM_VALUE;
+        }
+
+        value *= POWERS_OF_TEN[step];
+        scale -= step;
+    }
+    return value;
+}
+
+int32_t Decimal::CompareTo(const Decimal& other) const {
+    auto l_value = value_;
+    auto l_scale = scale_;
+    auto r_value = other.value_;
+    auto r_scale = other.scale_;
+
+    bool l_positive = l_value >= 0;
+    bool r_positive = r_value >= 0;
+    if (l_positive && !r_positive) {
+        return 1;
+    } else if (!l_positive && r_positive) {
+        return -1;
+    }
+
+    // compare integral parts
+    Decimal::int128_t l_integral = DownScaleInt128(l_value, l_scale);
+    Decimal::int128_t r_integral = DownScaleInt128(r_value, r_scale);
+
+    if (l_integral < r_integral) {
+        return -1;
+    } else if (l_integral > r_integral) {
+        return 1;
+    }
+
+    // integral parts are equal, continue comparing fractional parts
+    // unnecessary to check overflow here because the scaled number will not
+    // exceed original ones
+    bool overflow = false, positive = l_value >= 0;
+    l_value -= ScaleInt128(l_integral, l_scale, &overflow);
+    r_value -= ScaleInt128(r_integral, r_scale, &overflow);
+
+    int32_t diff = l_scale - r_scale;
+    if (diff > 0) {
+        r_value = ScaleInt128(r_value, diff, &overflow);
+        if (overflow) {
+            return positive ? -1 : 1;
+        }
+    } else {
+        l_value = ScaleInt128(l_value, -diff, &overflow);
+        if (overflow) {
+            return positive ? 1 : -1;
+        }
+    }
+
+    if (l_value < r_value) {
+        return -1;
+    } else if (l_value > r_value) {
+        return 1;
+    } else {
+        return 0;
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/decimal_test.cpp 
b/src/paimon/common/data/decimal_test.cpp
new file mode 100644
index 0000000..038b3d3
--- /dev/null
+++ b/src/paimon/common/data/decimal_test.cpp
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/decimal.h"
+
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DecimalTest, TestSimple) {
+    auto CheckResult = [](const Decimal& decimal, const std::vector<uint8_t>& 
bytes) {
+        auto pool = GetDefaultPool();
+        // prepare java bytes
+        auto java_bytes = Bytes::AllocateBytes(bytes.size(), pool.get());
+        memcpy(java_bytes->data(), bytes.data(), bytes.size());
+        // prepare result cpp bytes
+        auto cpp_serialized_bytes = decimal.ToUnscaledBytes();
+
+        // check serialized bytes equal
+        ASSERT_EQ(std::vector<char>(java_bytes->data(), java_bytes->data() + 
java_bytes->size()),
+                  cpp_serialized_bytes);
+        // check deserialize equal
+        auto decimal2 = Decimal::FromUnscaledBytes(38, 38, java_bytes.get());
+        ASSERT_EQ(decimal, decimal2);
+    };
+    {
+        Decimal decimal(38, 38, 
DecimalUtils::StrToInt128("12345678998765432145678").value());
+        std::vector<uint8_t> java_bytes = {0x2, 0x9d, 0x42, 0xb6, 0xa7, 0x2a, 
0x9d, 0xc7, 0x7, 0xe};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, 
DecimalUtils::StrToInt128("-12345678998765432145678").value());
+        std::vector<uint8_t> java_bytes = {0xfd, 0x62, 0xbd, 0x49, 0x58,
+                                           0xd5, 0x62, 0x38, 0xf8, 0xf2};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, DecimalUtils::StrToInt128("0").value());
+        std::vector<uint8_t> java_bytes = {0x0};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, DecimalUtils::StrToInt128("1").value());
+        std::vector<uint8_t> java_bytes = {0x1};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, DecimalUtils::StrToInt128("-1").value());
+        std::vector<uint8_t> java_bytes = {0xff};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, DecimalUtils::StrToInt128("128").value());
+        std::vector<uint8_t> java_bytes = {0x0, 0x80};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, Decimal::INT128_MINIMUM_VALUE);
+        std::vector<uint8_t> java_bytes = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+                                           0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00};
+        CheckResult(decimal, java_bytes);
+    }
+    {
+        Decimal decimal(38, 38, Decimal::INT128_MAXIMUM_VALUE);
+        std::vector<uint8_t> java_bytes = {0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 
0xff, 0xff,
+                                           0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 
0xff, 0xff};
+        CheckResult(decimal, java_bytes);
+    }
+}
+
+TEST(DecimalTest, TestCompatibleWithJava) {
+    auto pool = GetDefaultPool();
+    auto file_system = std::make_unique<LocalFileSystem>();
+    auto file_name = paimon::test::GetDataDir() + "/decimal_bytes.data";
+    uint64_t file_length = 
file_system->GetFileStatus(file_name).value()->GetLen();
+    ASSERT_GT(file_length, 0);
+    ASSERT_OK_AND_ASSIGN(auto input_stream, file_system->Open(file_name));
+    auto data_bytes = Bytes::AllocateBytes(file_length, pool.get());
+    ASSERT_OK(input_stream->Read(data_bytes->data(), file_length));
+    auto byte_array_input_stream =
+        std::make_shared<ByteArrayInputStream>(data_bytes->data(), 
file_length);
+    DataInputStream data_input_stream(byte_array_input_stream);
+    for (int32_t i = 0; i < 2000; i++) {
+        // read decimal str
+        ASSERT_OK_AND_ASSIGN(int32_t decimal_str_len, 
data_input_stream.ReadValue<int32_t>());
+        auto decimal_str = Bytes::AllocateBytes(decimal_str_len, pool.get());
+        ASSERT_OK(data_input_stream.ReadBytes(decimal_str.get()));
+
+        // read decimal serialized bytes from java
+        ASSERT_OK_AND_ASSIGN(int32_t decimal_bytes_len, 
data_input_stream.ReadValue<int32_t>());
+        auto decimal_bytes = Bytes::AllocateBytes(decimal_bytes_len, 
pool.get());
+        ASSERT_OK(data_input_stream.ReadBytes(decimal_bytes.get()));
+
+        // check result
+        auto str = std::string(decimal_str->data(), decimal_str->size());
+        Decimal decimal(/*precision=*/29, /*scale=*/29, 
DecimalUtils::StrToInt128(str).value());
+        auto serialized_bytes = decimal.ToUnscaledBytes();
+        ASSERT_EQ(
+            std::vector<char>(decimal_bytes->data(), decimal_bytes->data() + 
decimal_bytes->size()),
+            serialized_bytes);
+        auto decimal2 =
+            Decimal::FromUnscaledBytes(/*precision=*/29, /*scale=*/29, 
decimal_bytes.get());
+        ASSERT_EQ(decimal, decimal2);
+    }
+}
+
+TEST(DecimalTest, TestCompareTo) {
+    auto CheckResult = [](const Decimal& decimal1, const Decimal& decimal2) {
+        ASSERT_FALSE(decimal1 < decimal1);
+        ASSERT_FALSE(decimal1 > decimal1);
+        ASSERT_EQ(decimal1, decimal1);
+
+        ASSERT_EQ(decimal1.CompareTo(decimal2), -1);
+        ASSERT_LT(decimal1, decimal2);
+        ASSERT_EQ(decimal2.CompareTo(decimal1), 1);
+        ASSERT_GT(decimal2, decimal1);
+        auto decimal3 = decimal1;
+        ASSERT_EQ(decimal3.CompareTo(decimal1), 0);
+        ASSERT_EQ(decimal3, decimal1);
+
+        Decimal negative_decimal1(decimal1.Precision(), decimal1.Scale(), 
-decimal1.Value());
+        Decimal negative_decimal2(decimal2.Precision(), decimal2.Scale(), 
-decimal2.Value());
+        ASSERT_EQ(negative_decimal1.CompareTo(negative_decimal2), 1);
+        ASSERT_GT(negative_decimal1, negative_decimal2);
+        ASSERT_EQ(negative_decimal2.CompareTo(negative_decimal1), -1);
+        ASSERT_LT(negative_decimal2, negative_decimal1);
+        auto negative_decimal3 = negative_decimal1;
+        ASSERT_EQ(negative_decimal3.CompareTo(negative_decimal1), 0);
+        ASSERT_EQ(negative_decimal3, negative_decimal1);
+    };
+
+    auto CheckEqual = [](const Decimal& decimal1, const Decimal& decimal2) {
+        ASSERT_EQ(decimal1.CompareTo(decimal2), 0);
+        ASSERT_EQ(decimal2.CompareTo(decimal1), 0);
+
+        Decimal negative_decimal1(decimal1.Precision(), decimal1.Scale(), 
-decimal1.Value());
+        Decimal negative_decimal2(decimal2.Precision(), decimal2.Scale(), 
-decimal2.Value());
+        ASSERT_EQ(negative_decimal1.CompareTo(negative_decimal2), 0);
+        ASSERT_EQ(negative_decimal2.CompareTo(negative_decimal1), 0);
+    };
+
+    // same scales
+    {
+        Decimal decimal1(23, 0, DecimalUtils::StrToInt128("99").value());
+        Decimal decimal2(23, 0, DecimalUtils::StrToInt128("100").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 5, DecimalUtils::StrToInt128("34543").value());
+        Decimal decimal2(23, 5, DecimalUtils::StrToInt128("4324324").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 15, 
DecimalUtils::StrToInt128("345345435432").value());
+        Decimal decimal2(23, 15, 
DecimalUtils::StrToInt128("345344425435432").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 20, DecimalUtils::StrToInt128("5").value());
+        Decimal decimal2(23, 20, DecimalUtils::StrToInt128("50").value());
+        CheckResult(decimal1, decimal2);
+    }
+
+    // different scales
+    {
+        Decimal decimal1(23, 4, DecimalUtils::StrToInt128("10000").value());
+        Decimal decimal2(23, 3, DecimalUtils::StrToInt128("10000").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 2, DecimalUtils::StrToInt128("111").value());
+        Decimal decimal2(23, 3, DecimalUtils::StrToInt128("1111").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 5, DecimalUtils::StrToInt128("999999").value());
+        Decimal decimal2(23, 5, DecimalUtils::StrToInt128("9999999").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 1, DecimalUtils::StrToInt128("100").value());
+        Decimal decimal2(23, 0, DecimalUtils::StrToInt128("99").value());
+        CheckResult(decimal1, decimal2);
+    }
+
+    // same integral parts
+    {
+        Decimal decimal1(23, 0, DecimalUtils::StrToInt128("99999").value());
+        Decimal decimal2(23, 1, DecimalUtils::StrToInt128("999999").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 3, DecimalUtils::StrToInt128("12345123").value());
+        Decimal decimal2(23, 5, 
DecimalUtils::StrToInt128("1234553432").value());
+        CheckResult(decimal1, decimal2);
+    }
+
+    // equal numbers
+    {
+        Decimal decimal1(23, 3, DecimalUtils::StrToInt128("100000").value());
+        Decimal decimal2(23, 0, DecimalUtils::StrToInt128("100").value());
+        CheckEqual(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 3, DecimalUtils::StrToInt128("100000").value());
+        Decimal decimal2(23, 3, DecimalUtils::StrToInt128("100000").value());
+        CheckEqual(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(23, 10, DecimalUtils::StrToInt128("1").value());
+        Decimal decimal2(23, 11, DecimalUtils::StrToInt128("10").value());
+        CheckEqual(decimal1, decimal2);
+    }
+
+    // large scales (>18)
+    {
+        Decimal decimal1(128, 35, DecimalUtils::StrToInt128("99").value());
+        Decimal decimal2(128, 35, DecimalUtils::StrToInt128("100").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(
+            128, 29, 
DecimalUtils::StrToInt128("12345678999999999999999999999999999998").value());
+        Decimal decimal2(
+            128, 30, 
DecimalUtils::StrToInt128("123456789999999999999999999999999999999").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(
+            128, 30, 
DecimalUtils::StrToInt128("123456789999999999999999999999999999900").value());
+        Decimal decimal2(
+            128, 29, 
DecimalUtils::StrToInt128("12345678999999999999999999999999999990").value());
+        CheckEqual(decimal1, decimal2);
+    }
+
+    // minimum and maximum
+    {
+        Decimal decimal1(128, 39, Decimal::INT128_MAXIMUM_VALUE);
+        Decimal decimal2(
+            128, 39, 
DecimalUtils::StrToInt128("170141183460469231731687303715884105727").value());
+        CheckEqual(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(128, 39, Decimal::INT128_MINIMUM_VALUE);
+        Decimal decimal2(
+            128, 39, 
DecimalUtils::StrToInt128("-170141183460469231731687303715884105728").value());
+        CheckEqual(decimal1, decimal2);
+    }
+
+    // fractional overflow
+    {
+        Decimal decimal1(128, 39, Decimal::INT128_MAXIMUM_VALUE);
+        Decimal decimal2(
+            128, 38, 
DecimalUtils::StrToInt128("99999999999999999999999999999999999999").value());
+        CheckResult(decimal1, decimal2);
+    }
+    {
+        Decimal decimal1(
+            128, 38, 
DecimalUtils::StrToInt128("-99999999999999999999999999999999999999").value());
+        Decimal decimal2(128, 39, Decimal::INT128_MINIMUM_VALUE);
+        CheckResult(decimal1, decimal2);
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/timestamp.cpp 
b/src/paimon/common/data/timestamp.cpp
new file mode 100644
index 0000000..3f4a313
--- /dev/null
+++ b/src/paimon/common/data/timestamp.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/timestamp.h"
+
+#include <ctime>
+#include <iomanip>
+#include <sstream>
+
+namespace paimon {
+const int32_t Timestamp::DEFAULT_PRECISION = 6;
+const int32_t Timestamp::MILLIS_PRECISION = 3;
+const int32_t Timestamp::MAX_PRECISION = 9;
+const int32_t Timestamp::MIN_PRECISION = 0;
+const int32_t Timestamp::MAX_COMPACT_PRECISION = 3;
+
+int64_t Timestamp::ToMicrosecond() const {
+    int64_t MICROS_PER_MILLIS = 1000l;
+    int64_t NANOS_PER_MICROS = 1000l;
+    int64_t micro = millisecond_ * MICROS_PER_MILLIS;
+    return micro + nano_of_millisecond_ / NANOS_PER_MICROS;
+}
+
+std::string Timestamp::ToString() const {
+    time_t seconds = millisecond_ / 1000;
+    std::stringstream out;
+    int64_t ns = (millisecond_ % 1000) * 1000000l + nano_of_millisecond_;
+    if (ns < 0) {
+        seconds -= 1;
+        ns += 1000000000l;
+    }
+
+    std::tm tm_info;
+    ::gmtime_r(&seconds, &tm_info);
+    out << std::put_time(&tm_info, "%Y-%m-%d %H:%M:%S") << '.' << 
std::setfill('0') << std::setw(9)
+        << ns;
+    // for year with less than 4 digits, year str is not 4 digits in 
put_time(), e.g., year 1 is "1"
+    // rather than "0001"
+    static const int32_t MAX_STR_LENGTH = 29;
+    std::string ret = out.str();
+    if (ret.length() < MAX_STR_LENGTH) {
+        ret.insert(0, MAX_STR_LENGTH - ret.length(), '0');
+    }
+    return ret;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/timestamp_test.cpp 
b/src/paimon/common/data/timestamp_test.cpp
new file mode 100644
index 0000000..886f087
--- /dev/null
+++ b/src/paimon/common/data/timestamp_test.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/timestamp.h"
+
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+class TimestampTest : public ::testing::Test {
+ protected:
+    void SetUp() override {
+        timestamp_ = Timestamp(1622547800000, 123456);  // 2021-06-01 
11:43:20.000123456
+    }
+
+    Timestamp timestamp_;
+};
+
+TEST_F(TimestampTest, GetMillisecond) {
+    ASSERT_EQ(timestamp_.GetMillisecond(), 1622547800000);
+}
+
+TEST_F(TimestampTest, GetNanoOfMillisecond) {
+    ASSERT_EQ(timestamp_.GetNanoOfMillisecond(), 123456);
+}
+
+TEST_F(TimestampTest, ToNanosecond) {
+    ASSERT_EQ(timestamp_.ToNanosecond(), 1622547800000123456ll);
+}
+
+TEST_F(TimestampTest, FromEpochMillis) {
+    Timestamp ts1 = Timestamp::FromEpochMillis(1622547800000);
+    ASSERT_EQ(ts1.GetMillisecond(), 1622547800000);
+    ASSERT_EQ(ts1.GetNanoOfMillisecond(), 0);
+
+    Timestamp ts2 = Timestamp::FromEpochMillis(1622547800000, 123456);
+    ASSERT_EQ(ts2.GetMillisecond(), 1622547800000);
+    ASSERT_EQ(ts2.GetNanoOfMillisecond(), 123456);
+}
+
+TEST_F(TimestampTest, ToMillisTimestamp) {
+    Timestamp ts = timestamp_.ToMillisTimestamp();
+    ASSERT_EQ(ts.GetMillisecond(), 1622547800000);
+    ASSERT_EQ(ts.GetNanoOfMillisecond(), 0);
+}
+
+TEST_F(TimestampTest, IsCompact) {
+    ASSERT_TRUE(Timestamp::IsCompact(3));
+    ASSERT_FALSE(Timestamp::IsCompact(6));
+}
+
+TEST_F(TimestampTest, EqualityOperator) {
+    Timestamp ts1(1622547800000, 123456);
+    Timestamp ts2(1622547800000, 123456);
+    Timestamp ts3(1622547800000, 654321);
+    ASSERT_EQ(ts1, ts1);
+    ASSERT_EQ(ts1, ts2);
+    ASSERT_NE(ts1, ts3);
+}
+
+TEST_F(TimestampTest, LessThanOperator) {
+    Timestamp ts1(1622547800000, 123456);
+    Timestamp ts2(1622547800000, 654321);
+    Timestamp ts3(1622547800001, 123456);
+    ASSERT_LT(ts1, ts2);
+    ASSERT_LT(ts1, ts3);
+    ASSERT_FALSE(ts2 < ts1);
+    ASSERT_FALSE(ts3 < ts1);
+}
+
+TEST_F(TimestampTest, ToString) {
+    ASSERT_EQ(timestamp_.ToString(), "2021-06-01 11:43:20.000123456");
+    ASSERT_EQ(Timestamp(-1, 0).ToString(), "1969-12-31 23:59:59.999000000");
+    ASSERT_EQ(Timestamp(-62109569749000l, 0).ToString(), "0001-10-29 
05:44:11.000000000");
+}
+
+TEST_F(TimestampTest, TestToMicrosecond) {
+    {
+        Timestamp ts(1622547800000, 123456);
+        ASSERT_EQ(1622547800000123l, ts.ToMicrosecond());
+    }
+    {
+        Timestamp ts(-16225478, 123456);
+        ASSERT_EQ(-16225477877, ts.ToMicrosecond());
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/logging/logging.cpp 
b/src/paimon/common/logging/logging.cpp
new file mode 100644
index 0000000..5e8a3f1
--- /dev/null
+++ b/src/paimon/common/logging/logging.cpp
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/logging.h"
+
+#include <cerrno>
+#include <cstdarg>
+#include <mutex>
+#include <optional>
+#include <shared_mutex>
+
+#include "glog/log_severity.h"
+#include "glog/logging.h"
+#include "glog/raw_logging.h"
+
+namespace paimon {
+
+static std::optional<Logger::LoggerCreator>& getLoggerCreator() {
+    static std::optional<Logger::LoggerCreator> _loggerCreator;
+    return _loggerCreator;
+}
+
+static std::shared_mutex& getRegistryLock() {
+    static std::shared_mutex registryMutex;
+    return registryMutex;
+}
+
+void Logger::RegisterLogger(LoggerCreator creator) {
+    std::unique_lock<std::shared_mutex> lock(getRegistryLock());
+    getLoggerCreator() = creator;
+}
+
+static google::LogSeverity ToGlogLevel(PaimonLogLevel level) {
+    switch (level) {
+        case PAIMON_LOG_LEVEL_DEBUG:
+            return google::GLOG_INFO;
+        case PAIMON_LOG_LEVEL_INFO:
+            return google::GLOG_INFO;
+        case PAIMON_LOG_LEVEL_WARN:
+            return google::GLOG_WARNING;
+        case PAIMON_LOG_LEVEL_ERROR:
+            return google::GLOG_ERROR;
+        case PAIMON_LOG_LEVEL_NONE:
+        case PAIMON_LOG_LEVEL_MAX:
+        default:
+            return google::GLOG_INFO;
+    }
+}
+
+class GlogAdaptor : public Logger {
+ public:
+    void LogV(PaimonLogLevel level, const char* fname, int lineno, const char* 
function,
+              const char* fmt, ...) override {
+        va_list args;
+        va_start(args, fmt);
+        google::RawLog__(ToGlogLevel(level), fname, lineno, fmt, args);
+        va_end(args);
+    }
+
+    bool IsLevelEnabled(PaimonLogLevel level) const override {
+        return true;
+    }
+};
+
+std::unique_ptr<Logger> Logger::GetLogger(const std::string& path) {
+    auto& creator = getLoggerCreator();
+    if (creator) {
+        std::shared_lock<std::shared_mutex> lock(getRegistryLock());
+        return creator.value()(path);
+    }
+    std::unique_lock<std::shared_mutex> ulock(getRegistryLock());
+    if (!google::IsGoogleLoggingInitialized()) {
+        google::InitGoogleLogging(program_invocation_name);
+    }
+    return std::make_unique<GlogAdaptor>();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/logging/logging_test.cpp 
b/src/paimon/common/logging/logging_test.cpp
new file mode 100644
index 0000000..8b6e3f6
--- /dev/null
+++ b/src/paimon/common/logging/logging_test.cpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/logging.h"
+
+#include <future>
+#include <random>
+#include <thread>
+
+#include "paimon/common/executor/future.h"
+#include "paimon/executor.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(LoggerTest, TestMultiThreadGetLogger) {
+    auto executor = CreateDefaultExecutor(/*thread_count=*/4);
+    auto get_logger = []() {
+        auto logger = Logger::GetLogger("my_log");
+        ASSERT_TRUE(logger);
+    };
+
+    std::vector<std::future<void>> futures;
+    for (int32_t i = 0; i < 1000; ++i) {
+        futures.push_back(Via(executor.get(), get_logger));
+    }
+    Wait(futures);
+}
+}  // namespace paimon::test


Reply via email to