This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6968ab8 feat: introduce basic data types (#10)
6968ab8 is described below
commit 6968ab8a871d300d8e14022c9c56b4c5095c87c8
Author: lszskye <[email protected]>
AuthorDate: Mon May 25 11:58:37 2026 +0800
feat: introduce basic data types (#10)
---
include/paimon/data/blob.h | 113 +++++++++
include/paimon/data/decimal.h | 146 ++++++++++++
include/paimon/data/timestamp.h | 139 +++++++++++
include/paimon/logging.h | 73 ++++++
src/paimon/common/data/blob_defs.h | 63 +++++
src/paimon/common/data/blob_descriptor.cpp | 114 +++++++++
src/paimon/common/data/blob_descriptor.h | 92 ++++++++
src/paimon/common/data/blob_descriptor_test.cpp | 228 ++++++++++++++++++
src/paimon/common/data/blob_test.cpp | 188 +++++++++++++++
src/paimon/common/data/blob_utils.cpp | 120 ++++++++++
src/paimon/common/data/blob_utils.h | 71 ++++++
src/paimon/common/data/blob_utils_test.cpp | 145 ++++++++++++
src/paimon/common/data/decimal.cpp | 222 ++++++++++++++++++
src/paimon/common/data/decimal_test.cpp | 292 ++++++++++++++++++++++++
src/paimon/common/data/timestamp.cpp | 63 +++++
src/paimon/common/data/timestamp_test.cpp | 103 +++++++++
src/paimon/common/logging/logging.cpp | 94 ++++++++
src/paimon/common/logging/logging_test.cpp | 42 ++++
18 files changed, 2308 insertions(+)
diff --git a/include/paimon/data/blob.h b/include/paimon/data/blob.h
new file mode 100644
index 0000000..ff89714
--- /dev/null
+++ b/include/paimon/data/blob.h
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+struct ArrowSchema;
+
+namespace paimon {
+
+/// Represents a binary large object (BLOB) that can be read from various
sources.
+///
+/// The Blob class provides a unified interface for handling binary data from
different
+/// sources including file path and blob descriptor. It supports reading data
through input
+/// streams and provides descriptor-based serialization.
+class PAIMON_EXPORT Blob {
+ public:
+ ~Blob();
+
+ /// Creates a Blob from a file path.
+ ///
+ /// @param path The file path to create the blob from.
+ /// @return A result containing the created blob or an error.
+ static Result<std::unique_ptr<Blob>> FromPath(const std::string& path);
+
+ /// Creates a Blob from a file path with specified offset and length.
+ ///
+ /// @param path The file path to create the blob from.
+ /// @param offset The starting offset within the file.
+ /// @param length The length of data to read from the file.
+ /// @return A result containing the created blob or an error.
+ static Result<std::unique_ptr<Blob>> FromPath(const std::string& path,
int64_t offset,
+ int64_t length);
+
+ /// Creates a Blob from a blob descriptor.
+ ///
+ /// @param buffer The buffer of the blob descriptor.
+ /// @param length The length of the buffer.
+ /// @return A result containing the created blob or an error.
+ static Result<std::unique_ptr<Blob>> FromDescriptor(const char* buffer,
uint64_t length);
+
+ /// Converts the blob to a blob descriptor.
+ ///
+ /// @param pool The memory pool to use for allocation.
+ /// @return A blob descriptor bytes representing the blob.
+ PAIMON_UNIQUE_PTR<Bytes> ToDescriptor(const std::shared_ptr<MemoryPool>&
pool) const;
+
+ /// Gets the URI of the blob.
+ const std::string& Uri() const;
+
+ /// Creates an input stream for reading the blob data.
+ ///
+ /// @param fs The file system to use for reading.
+ /// @return A result containing the input stream or an error.
+ Result<std::unique_ptr<InputStream>> NewInputStream(
+ const std::shared_ptr<FileSystem>& fs) const;
+
+ /// Reads the blob data to bytes.
+ ///
+ /// @param fs The file system to use for reading.
+ /// @param pool The memory pool to use for allocation.
+ /// @return A result containing the blob data bytes or an error.
+ Result<PAIMON_UNIQUE_PTR<Bytes>> ToData(const std::shared_ptr<FileSystem>&
fs,
+ const std::shared_ptr<MemoryPool>&
pool) const;
+
+ /// Creates an Arrow field definition for the Blob type.
+ ///
+ /// This function constructs an Arrow Field (internally using
`arrow::large_binary()`)
+ /// and exports it to the C data interface structure `::ArrowSchema`.
+ /// It automatically injects Paimon-specific metadata to identify the
field as a BLOB.
+ ///
+ /// @param field_name The name of the Arrow field.
+ /// @param metadata A map of key-value metadata to be attached to the
field.
+ /// @return A result containing a unique pointer to the generated
`ArrowSchema` or an error.
+ static Result<std::unique_ptr<::ArrowSchema>> ArrowField(
+ const std::string& field_name, std::unordered_map<std::string,
std::string> metadata = {});
+
+ private:
+ class Impl;
+
+ explicit Blob(std::unique_ptr<Impl>&& impl);
+
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace paimon
diff --git a/include/paimon/data/decimal.h b/include/paimon/data/decimal.h
new file mode 100644
index 0000000..707e3e5
--- /dev/null
+++ b/include/paimon/data/decimal.h
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Bytes;
+
+/// A data structure representing data of Decimal. It might be stored in a
compact representation
+/// (as a long value) if values are small enough.
+class PAIMON_EXPORT Decimal {
+ public:
+ using int128_t = __int128_t;
+ using uint128_t = __uint128_t;
+
+ Decimal(int32_t precision, int32_t scale, int128_t value)
+ : precision_(precision), scale_(scale), value_(value) {}
+
+ /// Get the **precision** of this decimal.
+ ///
+ /// The precision is the number of digits in the unscaled value.
+ int32_t Precision() const {
+ return precision_;
+ }
+
+ /// Get the **scale** of this decimal.
+ int32_t Scale() const {
+ return scale_;
+ }
+
+ /// Get the underlying int128_t value of this decimal.
+ int128_t Value() const {
+ return value_;
+ }
+
+ /// Get the low 64 bits of the decimal value.
+ uint64_t LowBits() const {
+ return static_cast<uint64_t>(value_ & 0xFFFFFFFFFFFFFFFF);
+ }
+
+ /// Get the high 64 bits of the decimal value.
+ uint64_t HighBits() const {
+ return static_cast<uint64_t>(value_ >> 64);
+ }
+
+ /// @return Whether the decimal value is small enough to be stored in a
long.
+ bool IsCompact() const {
+ return precision_ <= MAX_COMPACT_PRECISION;
+ }
+
+ std::string ToString() const;
+
+ /// @return Whether the decimal value is small enough to be stored in a
long.
+ static bool IsCompact(int32_t precision) {
+ return precision <= MAX_COMPACT_PRECISION;
+ }
+
+ /// Creates an instance of `Decimal` from an unscaled long value and the
given precision and
+ /// scale.
+ static Decimal FromUnscaledLong(int64_t unscaled_long, int32_t precision,
int32_t scale) {
+ return {precision, scale, unscaled_long};
+ }
+
+ /// @return A long describing the **unscaled value** of this decimal.
+ int64_t ToUnscaledLong() const {
+ int64_t ret = 0;
+ uint64_t low_bits = (value_ & 0xFFFFFFFFFFFFFFFF);
+ memcpy(&ret, &low_bits, sizeof(uint64_t));
+ return ret;
+ }
+
+ /// @return A byte array describing the **unscaled value** of this decimal.
+ std::vector<char> ToUnscaledBytes() const;
+
+ /// Creates an instance of `Decimal` from an unscaled byte array value and
the given precision
+ /// and scale.
+ static Decimal FromUnscaledBytes(int32_t precision, int32_t scale, Bytes*
bytes);
+
+ bool operator==(const Decimal& other) const {
+ if (this == &other) {
+ return true;
+ }
+ return CompareTo(other) == 0;
+ }
+
+ bool operator<(const Decimal& other) const {
+ if (this == &other) {
+ return false;
+ }
+ return CompareTo(other) < 0;
+ }
+
+ bool operator>(const Decimal& other) const {
+ if (this == &other) {
+ return false;
+ }
+ return CompareTo(other) > 0;
+ }
+
+ int32_t CompareTo(const Decimal& other) const;
+ static constexpr int32_t DEFAULT_PRECISION = 10;
+ static constexpr int32_t DEFAULT_SCALE = 0;
+ static constexpr int32_t MIN_PRECISION = 1;
+ static constexpr int32_t MAX_PRECISION = 38;
+
+ private:
+ static constexpr int32_t MAX_COMPACT_PRECISION = 18;
+ static const int128_t INT128_MAXIMUM_VALUE;
+ static const int128_t INT128_MINIMUM_VALUE;
+ static const int64_t POWERS_OF_TEN[MAX_COMPACT_PRECISION + 1];
+
+ static int32_t clz_u128(uint128_t u);
+ static int32_t count_leading_zero_bytes(uint128_t u);
+ static int32_t count_leading_all_ones_bytes(uint128_t u);
+ static int128_t DownScaleInt128(int128_t value, int32_t scale);
+ static int128_t ScaleInt128(int128_t value, int32_t scale, bool* overflow);
+
+ private:
+ int32_t precision_ = 0;
+ int32_t scale_ = 0;
+ int128_t value_ = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/data/timestamp.h b/include/paimon/data/timestamp.h
new file mode 100644
index 0000000..93dfe1b
--- /dev/null
+++ b/include/paimon/data/timestamp.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <chrono>
+#include <cstdint>
+#include <string>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+/// A data structure representing data of Timestamp without timezone.
+///
+/// This data structure is immutable and consists of a milliseconds and
nanos-of-millisecond since
+/// `1970-01-01 00:00:00`. It might be stored in a compact representation (as
a long value) if
+/// values are small enough. Timestamp range from 0000-01-01
00:00:00.000000000 to 9999-12-31
+/// 23:59:59.999999999.
+///
+class PAIMON_EXPORT Timestamp {
+ public:
+ Timestamp() : Timestamp(0, 0) {}
+ Timestamp(int64_t millisecond, int32_t nano_of_millisecond)
+ : millisecond_(millisecond), nano_of_millisecond_(nano_of_millisecond)
{
+ assert(nano_of_millisecond >= 0 && nano_of_millisecond <= 999999ll);
+ }
+
+ /// Get the number of milliseconds since `1970-01-01 00:00:00`.
+ int64_t GetMillisecond() const {
+ return millisecond_;
+ }
+ /// Get the number of nanoseconds (the nanoseconds within the
milliseconds).
+ ///
+ /// The value range is from 0 to 999,999.
+ int32_t GetNanoOfMillisecond() const {
+ return nano_of_millisecond_;
+ }
+
+ /// Creates an instance of `Timestamp` from milliseconds.
+ ///
+ /// The nanos-of-millisecond field will be set to zero.
+ ///
+ /// @param milliseconds The number of milliseconds since `1970-01-01
00:00:00`; a
+ /// negative number is the number of milliseconds before `1970-01-01
00:00:00`.
+ static Timestamp FromEpochMillis(int64_t milliseconds) {
+ return {milliseconds, 0};
+ }
+
+ /// Creates an instance of `Timestamp` from milliseconds and a
nanos-of-millisecond.
+ ///
+ /// @param milliseconds The number of milliseconds since `1970-01-01
00:00:00`; a
+ /// negative number is the number of milliseconds before `1970-01-01
00:00:00`.
+ /// @param nanos_of_millisecond The nanoseconds within the millisecond,
from 0 to 999,999.
+ static Timestamp FromEpochMillis(int64_t milliseconds, int32_t
nanos_of_millisecond) {
+ return {milliseconds, nanos_of_millisecond};
+ }
+
+ /// Converts this `Timestamp` object to millis `Timestamp` object (ignore
+ /// `nanos_of_millisecond`).
+ Timestamp ToMillisTimestamp() const {
+ return FromEpochMillis(millisecond_);
+ }
+
+ /// Converts this `Timestamp` object to microsecond.
+ int64_t ToMicrosecond() const;
+
+ /// Converts this `Timestamp` object to nanoseconds.
+ int64_t ToNanosecond() const {
+ std::chrono::nanoseconds nanosecond =
std::chrono::nanoseconds(nano_of_millisecond_) +
+
std::chrono::milliseconds(millisecond_);
+ return nanosecond.count();
+ }
+
+ /// @return Whether the timestamp data is small enough to be stored in a
long of
+ /// milliseconds.
+ static bool IsCompact(int32_t precision) {
+ return precision <= MAX_COMPACT_PRECISION;
+ }
+
+ bool operator==(const Timestamp& other) const {
+ if (this == &other) {
+ return true;
+ }
+ return millisecond_ == other.millisecond_ &&
+ nano_of_millisecond_ == other.nano_of_millisecond_;
+ }
+
+ bool operator!=(const Timestamp& other) const {
+ return !(*this == other);
+ }
+
+ bool operator<(const Timestamp& other) const {
+ if (millisecond_ == other.millisecond_) {
+ return nano_of_millisecond_ < other.nano_of_millisecond_;
+ }
+ return millisecond_ < other.millisecond_;
+ }
+
+ /// Converts the Timestamp object to a string representation in UTC (GMT).
+ ///
+ /// The format of the returned string is "YYYY-MM-DD HH:MM:SS.nnnnnnnnn",
+ /// where the date and time are in UTC (GMT), and the nanoseconds are
derived
+ /// from the millisecond and nanosecond parts of the timestamp.
+ ///
+ /// @note This method uses UTC (GMT) time zone when formatting the time.
+ /// This is different from the Java Paimon implementation, which may
convert
+ /// the timestamp to the local time zone of the machine running the Java
process.
+ std::string ToString() const;
+
+ static const int32_t DEFAULT_PRECISION;
+ static const int32_t MILLIS_PRECISION;
+ static const int32_t MIN_PRECISION;
+ static const int32_t MAX_PRECISION;
+ static const int32_t MAX_COMPACT_PRECISION;
+
+ private:
+ int64_t millisecond_ = 0;
+ int32_t nano_of_millisecond_ = 0;
+};
+
+} // namespace paimon
diff --git a/include/paimon/logging.h b/include/paimon/logging.h
new file mode 100644
index 0000000..0859d44
--- /dev/null
+++ b/include/paimon/logging.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+#define PAIMON_LOGGER_IMPL "logger_impl"
+
+enum PaimonLogLevel {
+ PAIMON_LOG_LEVEL_DEBUG = 0,
+ PAIMON_LOG_LEVEL_INFO = 1,
+ PAIMON_LOG_LEVEL_WARN = 2,
+ PAIMON_LOG_LEVEL_ERROR = 3,
+ PAIMON_LOG_LEVEL_NONE = 4,
+ PAIMON_LOG_LEVEL_MAX = 5
+};
+
+#define PAIMON_LOG_V(level, logger, format, ...)
\
+ do {
\
+ if (logger->IsLevelEnabled(PAIMON_LOG_LEVEL_##level)) {
\
+ logger->LogV(PAIMON_LOG_LEVEL_##level, __FILE__, __LINE__,
__FUNCTION__, format, \
+ __VA_ARGS__);
\
+ }
\
+ } while (0)
+
+#define PAIMON_LOG_INFO(logger, fmt, ...) PAIMON_LOG_V(INFO, logger, fmt,
__VA_ARGS__)
+#define PAIMON_LOG_ERROR(logger, fmt, ...) PAIMON_LOG_V(ERROR, logger, fmt,
__VA_ARGS__)
+#define PAIMON_LOG_WARN(logger, fmt, ...) PAIMON_LOG_V(WARN, logger, fmt,
__VA_ARGS__)
+#define PAIMON_LOG_DEBUG(logger, fmt, ...) PAIMON_LOG_V(DEBUG, logger, fmt,
__VA_ARGS__)
+
+class PAIMON_EXPORT Logger {
+ public:
+ using LoggerCreator = std::function<std::unique_ptr<Logger>(const
std::string&)>;
+
+ static void RegisterLogger(LoggerCreator creator);
+
+ static std::unique_ptr<Logger> GetLogger(const std::string& path);
+
+ virtual void LogV(PaimonLogLevel level, const char* fname, int lineno,
const char* function,
+ const char* fmt, ...) = 0;
+
+ virtual bool IsLevelEnabled(PaimonLogLevel level) const = 0;
+
+ virtual ~Logger() = default;
+
+ protected:
+ Logger() = default;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/data/blob_defs.h
b/src/paimon/common/data/blob_defs.h
new file mode 100644
index 0000000..6a478e7
--- /dev/null
+++ b/src/paimon/common/data/blob_defs.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+
+namespace paimon {
+
+/// Blob file format constants shared between writer and reader.
+///
+/// A Blob field uses the 'large_binary' type as its underlying physical
storage in Apache Arrow
+/// Schema, and is marked as the Paimon Blob extension type by attaching
specific
+/// **KeyValueMetadata**. Multiple blob fields in one paimon table are
supported.
+class BlobDefs {
+ public:
+ BlobDefs() = delete;
+ ~BlobDefs() = delete;
+
+ /// To create a Blob field:
+ /// @code
+ /// std::unordered_map<std::string, std::string> blob_metadata_map = {
+ /// {Blob::kExtensionTypeKey, Blob::kExtensionTypeValue}
+ /// };
+ /// auto field = arrow::field("my_blob_field", arrow::large_binary(),
false,
+ /// std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map));
+ /// @endcode
+ /// Metadata key identifying a Paimon Blob extension type field.
+ static constexpr char kExtensionTypeKey[] = "paimon.extension.type";
+ /// Metadata value identifying a Paimon Blob extension type field.
+ static constexpr char kExtensionTypeValue[] = "paimon.type.blob";
+
+ /// A bin_length value of -1 in the index indicates a null blob entry.
+ static constexpr int64_t kNullBinLength = -1;
+ /// Blob file format version.
+ static constexpr int8_t kFileVersion = 1;
+ /// Magic number identifying the start of each blob bin.
+ static constexpr int32_t kMagicNumber = 1481511375;
+ /// Offset from the start of a bin to the actual blob content (magic
number size).
+ static constexpr int32_t kContentStartOffset = 4;
+ /// Total metadata length per bin: magic(4) + bin_length(8) + crc32(4) =
16.
+ static constexpr int32_t kTotalMetaLength = 16;
+ /// Blob file header length: index_len(4) + version(1) = 5.
+ static constexpr uint32_t kBlobFileHeaderLength = 5;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/data/blob_descriptor.cpp
b/src/paimon/common/data/blob_descriptor.cpp
new file mode 100644
index 0000000..f5624c7
--- /dev/null
+++ b/src/paimon/common/data/blob_descriptor.cpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_descriptor.h"
+
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(const
std::string& uri,
+ int64_t offset,
int64_t length) {
+ return Create(kCurrentVersion, uri, offset, length);
+}
+
+Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Create(int8_t version,
+ const
std::string& uri,
+ int64_t offset,
int64_t length) {
+ if (offset < 0) {
+ return Status::Invalid(fmt::format("offset {} is less than 0",
offset));
+ }
+ // length == -1 means it's dynamic length
+ if (length < -1) {
+ return Status::Invalid(fmt::format("length {} is less than -1",
length));
+ }
+ return std::unique_ptr<BlobDescriptor>(new BlobDescriptor(version, uri,
offset, length));
+}
+
+PAIMON_UNIQUE_PTR<Bytes> BlobDescriptor::Serialize(const
std::shared_ptr<MemoryPool>& pool) const {
+ MemorySegmentOutputStream
out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
+ out.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ out.WriteValue<int8_t>(version_);
+ out.WriteValue<int64_t>(kMagic);
+ out.WriteValue<int32_t>(static_cast<int32_t>(uri_.size()));
+
+ auto uri_bytes = std::make_shared<Bytes>(uri_, pool.get());
+ out.WriteBytes(uri_bytes);
+ out.WriteValue<int64_t>(offset_);
+ out.WriteValue<int64_t>(length_);
+ return MemorySegmentUtils::CopyToBytes(out.Segments(), 0,
out.CurrentSize(), pool.get());
+}
+
+Result<std::unique_ptr<BlobDescriptor>> BlobDescriptor::Deserialize(const
char* buffer,
+ uint64_t
size) {
+ auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, size);
+ DataInputStream in(std::move(input_stream));
+ in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
+ if (version > kCurrentVersion) {
+ return Status::Invalid(fmt::format(
+ "Expecting BlobDescriptor version to be less than or equal to {},
but found {}.",
+ kCurrentVersion, version));
+ }
+ if (version > 1) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
+ if (kMagic != magic) {
+ return Status::Invalid(fmt::format(
+ "Invalid BlobDescriptor: missing magic header. Expected magic:
{}, but found {}",
+ kMagic, magic));
+ }
+ }
+ PAIMON_ASSIGN_OR_RAISE(int32_t uri_length, in.ReadValue<int32_t>());
+ std::string uri(uri_length, '\0');
+ PAIMON_RETURN_NOT_OK(in.Read(uri.data(), uri.size()));
+ PAIMON_ASSIGN_OR_RAISE(int64_t offset, in.ReadValue<int64_t>());
+ PAIMON_ASSIGN_OR_RAISE(int64_t length, in.ReadValue<int64_t>());
+ return BlobDescriptor::Create(version, uri, offset, length);
+}
+
+Result<bool> BlobDescriptor::IsBlobDescriptor(const char* buffer, uint64_t
size) {
+ if (size < kMinDescriptorLength) {
+ return false;
+ }
+ auto input_stream = std::make_shared<ByteArrayInputStream>(buffer, size);
+ DataInputStream in(std::move(input_stream));
+ in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue<int8_t>());
+ if (version > kCurrentVersion) {
+ return false;
+ }
+ PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue<int64_t>());
+ return kMagic == magic;
+}
+
+std::string BlobDescriptor::ToString() const {
+ return fmt::format("BlobDescriptor{{version={}, uri='{}', offset={},
length={}}}", version_,
+ uri_, offset_, length_);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/blob_descriptor.h
b/src/paimon/common/data/blob_descriptor.h
new file mode 100644
index 0000000..6664b85
--- /dev/null
+++ b/src/paimon/common/data/blob_descriptor.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+
+namespace paimon {
+/// Blob descriptor to describe a blob reference.
+/// Memory Layout Description: All multi-byte numerical values (int/long) are
stored using Little
+/// Endian byte order.
+///
+/// | Offset | Field Name | Type | Size |
+/// |--------|---------------|-----------|------|
+/// | 0 | version | byte | 1 |
+/// | 1 | magic_number | long | 8 |
+/// | 9 | uri_length | int | 4 |
+/// | 13 | uri_bytes | byte[N] | N |
+/// | 13 + N | offset | long | 8 |
+/// | 21 + N | length | long | 8 |
+
+class BlobDescriptor {
+ public:
+ static Result<std::unique_ptr<BlobDescriptor>> Create(const std::string&
uri, int64_t offset,
+ int64_t length);
+
+ static Result<std::unique_ptr<BlobDescriptor>> Create(int8_t version,
const std::string& uri,
+ int64_t offset,
int64_t length);
+
+ static Result<std::unique_ptr<BlobDescriptor>> Deserialize(const char*
buffer, uint64_t size);
+
+ static Result<bool> IsBlobDescriptor(const char* buffer, uint64_t size);
+
+ PAIMON_UNIQUE_PTR<Bytes> Serialize(const std::shared_ptr<MemoryPool>&
pool) const;
+
+ std::string ToString() const;
+
+ int8_t Version() const {
+ return version_;
+ }
+
+ const std::string& Uri() const {
+ return uri_;
+ }
+
+ int64_t Offset() const {
+ return offset_;
+ }
+
+ int64_t Length() const {
+ return length_;
+ }
+
+ private:
+ BlobDescriptor(int8_t version, const std::string& uri, int64_t offset,
int64_t length)
+ : version_(version), uri_(uri), offset_(offset), length_(length) {}
+
+ private:
+ static constexpr int64_t kMagic = 0x424C4F4244455343l;
+ /// one byte for version, eight bytes for magic number.
+ static constexpr uint64_t kMinDescriptorLength = 9;
+ static constexpr int8_t kCurrentVersion = 2;
+
+ const int8_t version_ = kCurrentVersion;
+ std::string uri_;
+ int64_t offset_ = 0;
+ int64_t length_ = -1;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/data/blob_descriptor_test.cpp
b/src/paimon/common/data/blob_descriptor_test.cpp
new file mode 100644
index 0000000..a5221bf
--- /dev/null
+++ b/src/paimon/common/data/blob_descriptor_test.cpp
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_descriptor.h"
+
+#include <memory>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BlobDescriptorTest : public testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ ASSERT_OK_AND_ASSIGN(descriptor_,
+ BlobDescriptor::Create("test_uri",
/*offset=*/1024, /*length=*/2048));
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::unique_ptr<BlobDescriptor> descriptor_;
+};
+
+TEST_F(BlobDescriptorTest, TestConstructorAndGetters) {
+ ASSERT_EQ(descriptor_->Uri(), "test_uri");
+ ASSERT_EQ(descriptor_->Offset(), 1024);
+ ASSERT_EQ(descriptor_->Length(), 2048);
+}
+
+TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion1) {
+ std::vector<char> bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117,
114, 105, 0, 4,
+ 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0,
0, 0};
+ auto java_serialized = std::string(bytes.data(), bytes.size());
+
+ ASSERT_OK_AND_ASSIGN(auto descriptor,
BlobDescriptor::Deserialize(java_serialized.data(),
+
java_serialized.size()));
+ ASSERT_EQ(descriptor->Version(), (int8_t)1);
+ ASSERT_EQ(descriptor->Uri(), "test_uri");
+ ASSERT_EQ(descriptor->Offset(), 1024);
+ ASSERT_EQ(descriptor->Length(), 2048);
+}
+
+TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion2) {
+ std::vector<char> bytes = {2, 67, 83, 69, 68, 66, 79, 76, 66, 8,
0, 0, 0,
+ 116, 101, 115, 116, 95, 117, 114, 105, 0, 4,
0, 0, 0,
+ 0, 0, 0, 0, 8, 0, 0, 0, 0, 0,
0};
+ auto java_serialized = std::string(bytes.data(), bytes.size());
+
+ ASSERT_OK_AND_ASSIGN(auto descriptor,
BlobDescriptor::Deserialize(java_serialized.data(),
+
java_serialized.size()));
+ ASSERT_EQ(descriptor->Version(), (int8_t)2);
+ ASSERT_EQ(descriptor->Uri(), "test_uri");
+ ASSERT_EQ(descriptor->Offset(), 1024);
+ ASSERT_EQ(descriptor->Length(), 2048);
+
+ PAIMON_UNIQUE_PTR<Bytes> cpp_serialized = descriptor->Serialize(pool_);
+ auto cpp_serialized_string = std::string(cpp_serialized->data(),
cpp_serialized->size());
+ ASSERT_EQ(cpp_serialized_string, java_serialized);
+}
+
+TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithEmptyUri) {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> empty_uri_descriptor,
+ BlobDescriptor::Create(/*uri=*/"", /*offset=*/0,
/*length=*/0));
+ auto serialized = empty_uri_descriptor->Serialize(pool_);
+ ASSERT_OK_AND_ASSIGN(auto restored_descriptor,
+ BlobDescriptor::Deserialize(serialized->data(),
serialized->size()));
+
+ ASSERT_EQ(restored_descriptor->Uri(), "");
+ ASSERT_EQ(restored_descriptor->Offset(), 0);
+ ASSERT_EQ(restored_descriptor->Length(), 0);
+}
+
+TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithDynamicLength) {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> empty_uri_descriptor,
+ BlobDescriptor::Create("test_uri", /*offset=*/0,
/*length=*/-1));
+ auto serialized = empty_uri_descriptor->Serialize(pool_);
+ ASSERT_OK_AND_ASSIGN(auto restored_descriptor,
+ BlobDescriptor::Deserialize(serialized->data(),
serialized->size()));
+
+ ASSERT_EQ(restored_descriptor->Uri(), "test_uri");
+ ASSERT_EQ(restored_descriptor->Offset(), 0);
+ ASSERT_EQ(restored_descriptor->Length(), -1);
+}
+
+TEST_F(BlobDescriptorTest, TestInvalidParameters) {
+ // Test deserialize invalid version
+ {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> descriptor,
+ BlobDescriptor::Create(/*uri=*/"test",
/*offset=*/1, /*length=*/2));
+ auto serialized = descriptor->Serialize(pool_);
+ (*serialized)[0] = '\x03';
+ ASSERT_NOK_WITH_MSG(
+ BlobDescriptor::Deserialize(serialized->data(),
serialized->size()),
+ "Expecting BlobDescriptor version to be less than or equal to 2,
but found 3");
+ }
+ // Test deserialize invalid buffer size
+ {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<BlobDescriptor> descriptor,
+ BlobDescriptor::Create(/*uri=*/"test",
/*offset=*/1, /*length=*/2));
+ auto serialized = descriptor->Serialize(pool_);
+ ASSERT_NOK(BlobDescriptor::Deserialize(serialized->data(),
/*size=*/5));
+ }
+ // Test invalid offset
+ {
+ ASSERT_NOK_WITH_MSG(BlobDescriptor::Create(/*uri=*/"test",
/*offset=*/-1, /*length=*/2),
+ "offset -1 is less than 0");
+ }
+ // Test invalid length
+ {
+ ASSERT_NOK_WITH_MSG(BlobDescriptor::Create(/*uri=*/"test",
/*offset=*/1, /*length=*/-2),
+ "length -2 is less than -1");
+ }
+}
+
+TEST_F(BlobDescriptorTest, TestToString) {
+ std::string debug_str = descriptor_->ToString();
+ ASSERT_FALSE(debug_str.empty());
+ ASSERT_TRUE(debug_str.find("version=2") != std::string::npos);
+ ASSERT_TRUE(debug_str.find("uri='test_uri'") != std::string::npos);
+ ASSERT_TRUE(debug_str.find("offset=1024") != std::string::npos);
+ ASSERT_TRUE(debug_str.find("length=2048") != std::string::npos);
+}
+
+TEST_F(BlobDescriptorTest, TestRoundTripConsistency) {
+ auto first_serialized = descriptor_->Serialize(pool_);
+ ASSERT_OK_AND_ASSIGN(
+ auto first_restored,
+ BlobDescriptor::Deserialize(first_serialized->data(),
first_serialized->size()));
+ auto second_serialized = first_restored->Serialize(pool_);
+ ASSERT_EQ(*first_serialized, *second_serialized);
+
+ ASSERT_OK_AND_ASSIGN(
+ auto second_restored,
+ BlobDescriptor::Deserialize(second_serialized->data(),
second_serialized->size()));
+ ASSERT_EQ(second_restored->Uri(), "test_uri");
+ ASSERT_EQ(second_restored->Offset(), 1024);
+ ASSERT_EQ(second_restored->Length(), 2048);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithValidDescriptor) {
+ // A valid v2 descriptor should be recognized
+ auto serialized = descriptor_->Serialize(pool_);
+ ASSERT_OK_AND_ASSIGN(bool result,
+ BlobDescriptor::IsBlobDescriptor(serialized->data(),
serialized->size()));
+ ASSERT_TRUE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithTooShortBuffer) {
+ // Buffer shorter than 9 bytes should return false
+ std::vector<char> short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42,
0x4F, 0x4C};
+ ASSERT_OK_AND_ASSIGN(
+ bool result, BlobDescriptor::IsBlobDescriptor(short_buffer.data(),
short_buffer.size()));
+ ASSERT_FALSE(result);
+
+ // Empty buffer
+ ASSERT_OK_AND_ASSIGN(bool empty_result,
BlobDescriptor::IsBlobDescriptor(nullptr, 0));
+ ASSERT_FALSE(empty_result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithFutureVersion) {
+ // Version > CURRENT_VERSION should return false (not an error)
+ auto serialized = descriptor_->Serialize(pool_);
+ (*serialized)[0] = '\x03'; // set version to 3 (> CURRENT_VERSION)
+ ASSERT_OK_AND_ASSIGN(bool result,
+ BlobDescriptor::IsBlobDescriptor(serialized->data(),
serialized->size()));
+ ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithWrongMagic) {
+ // Wrong magic number should return false
+ auto serialized = descriptor_->Serialize(pool_);
+ // Corrupt the magic bytes (bytes 1-8)
+ (*serialized)[1] = '\x00';
+ (*serialized)[2] = '\x00';
+ ASSERT_OK_AND_ASSIGN(bool result,
+ BlobDescriptor::IsBlobDescriptor(serialized->data(),
serialized->size()));
+ ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithRandomData) {
+ // Random data that doesn't match blob descriptor format
+ std::vector<char> random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x09};
+ ASSERT_OK_AND_ASSIGN(bool result,
+ BlobDescriptor::IsBlobDescriptor(random_data.data(),
random_data.size()));
+ ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithVersion1Data) {
+ // v1 data: version=1, followed by uri_length (not magic), should return
false
+ // because reading bytes 1-8 as magic won't match MAGIC constant
+ std::vector<char> v1_bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117,
114, 105, 0, 4,
+ 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0,
0, 0, 0};
+ ASSERT_OK_AND_ASSIGN(bool result,
+ BlobDescriptor::IsBlobDescriptor(v1_bytes.data(),
v1_bytes.size()));
+ ASSERT_FALSE(result);
+}
+
+TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithExactly9Bytes) {
+ // Exactly 9 bytes with valid version and magic should return true
+ // version=2, magic=0x424C4F4244455343 in little-endian
+ std::vector<char> minimal = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F,
0x4C, 0x42};
+ ASSERT_OK_AND_ASSIGN(bool result,
+ BlobDescriptor::IsBlobDescriptor(minimal.data(),
minimal.size()));
+ ASSERT_TRUE(result);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/blob_test.cpp
b/src/paimon/common/data/blob_test.cpp
new file mode 100644
index 0000000..4c95711
--- /dev/null
+++ b/src/paimon/common/data/blob_test.cpp
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/blob.h"
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "gtest/gtest.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BlobTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ dir_ = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ uri_ = dir_->Str() + "/file.blob";
+ file_system_ = std::make_shared<LocalFileSystem>();
+ ASSERT_OK(file_system_->WriteFile(uri_, "abcdefghijklmn", true));
+ }
+ void TearDown() override {}
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::unique_ptr<UniqueTestDirectory> dir_;
+ std::shared_ptr<FileSystem> file_system_;
+ std::string uri_;
+};
+
+TEST_F(BlobTest, TestSimple) {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob, Blob::FromPath(uri_));
+ auto serialized = blob->ToDescriptor(pool_);
+
+ ASSERT_OK_AND_ASSIGN(auto restored_blob,
+ Blob::FromDescriptor(serialized->data(),
serialized->size()));
+ ASSERT_EQ(*restored_blob->ToDescriptor(pool_), *serialized);
+ ASSERT_OK_AND_ASSIGN(auto input_stream,
restored_blob->NewInputStream(file_system_));
+ ASSERT_EQ(uri_, restored_blob->Uri());
+ std::string str(14, '\0');
+ ASSERT_OK(input_stream->Read(str.data(), str.size()));
+ ASSERT_EQ("abcdefghijklmn", str);
+}
+
+TEST_F(BlobTest, TestInvalidParameters) {
+ // Test null file system in NewInputStream
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob, Blob::FromPath(uri_));
+ ASSERT_NOK_WITH_MSG(blob->NewInputStream(nullptr), "file system is
nullptr");
+
+ // Test invalid descriptor
+ std::string invalid_bytes = "invalid_descriptor_bytes";
+ ASSERT_NOK(Blob::FromDescriptor(invalid_bytes.data(),
invalid_bytes.size()));
+}
+
+TEST_F(BlobTest, TestRoundTripConsistency) {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> original_blob,
+ Blob::FromPath(uri_, /*offset=*/1, /*length=*/5));
+
+ auto first_descriptor = original_blob->ToDescriptor(pool_);
+ ASSERT_OK_AND_ASSIGN(auto restored_blob,
+ Blob::FromDescriptor(first_descriptor->data(),
first_descriptor->size()));
+
+ auto second_descriptor = restored_blob->ToDescriptor(pool_);
+ ASSERT_EQ(*first_descriptor, *second_descriptor);
+
+ // Verify scheme consistency
+ ASSERT_EQ(original_blob->Uri(), restored_blob->Uri());
+}
+
+TEST_F(BlobTest, TestInputStreamCreation) {
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob, Blob::FromPath(uri_));
+ ASSERT_OK_AND_ASSIGN(auto input_stream,
blob->NewInputStream(file_system_));
+ ASSERT_TRUE(input_stream);
+
+ // Test reading from the input stream
+ std::string buffer(14, '\0');
+ ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(),
buffer.size()));
+ ASSERT_EQ(14, bytes_read);
+ ASSERT_EQ("abcdefghijklmn", buffer);
+}
+
+TEST_F(BlobTest, TestBoundaryConditions) {
+ // Test with empty path
+ ASSERT_NOK_WITH_MSG(Blob::FromPath(""), "path is an empty string");
+
+ // Test with very large offset and length values
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob,
+ Blob::FromPath(uri_, /*offset=*/INT64_MAX,
/*length=*/1));
+ auto serialized = blob->ToDescriptor(pool_);
+
+ ASSERT_OK_AND_ASSIGN(auto restored_blob,
+ Blob::FromDescriptor(serialized->data(),
serialized->size()));
+ ASSERT_EQ(*restored_blob->ToDescriptor(pool_), *serialized);
+}
+
+TEST_F(BlobTest, TestNewInputStreamWithOffsetAndLength) {
+ // Create blob with offset=2, length=6 (should read "cdefgh")
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob,
+ Blob::FromPath(uri_, /*offset=*/2, /*length=*/6));
+ ASSERT_OK_AND_ASSIGN(auto input_stream,
blob->NewInputStream(file_system_));
+ ASSERT_TRUE(input_stream);
+
+ ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
+ ASSERT_EQ(6, length);
+
+ // Test reading with offset and length applied
+ std::string buffer(6, '\0');
+ ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(),
buffer.size()));
+ ASSERT_EQ(6, bytes_read);
+ ASSERT_EQ("cdefgh", buffer);
+}
+
+TEST_F(BlobTest, TestNewInputStreamWithDynamicLength) {
+ // Create blob with offset=2, dynamic length (-1 means read to end)
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Blob> blob,
+ Blob::FromPath(uri_, /*offset=*/2, /*length=*/-1));
+ ASSERT_OK_AND_ASSIGN(auto input_stream,
blob->NewInputStream(file_system_));
+ ASSERT_TRUE(input_stream);
+
+ ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
+ ASSERT_EQ(12, length);
+
+ // Test reading from offset to end (should read "cdefghijklmn")
+ std::string buffer(12, '\0');
+ ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(),
buffer.size()));
+ ASSERT_EQ(12, bytes_read);
+ ASSERT_EQ("cdefghijklmn", buffer);
+}
+
+TEST_F(BlobTest, TestArrowField) {
+ {
+ // basic: field name, non-nullable by default
+ ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("my_blob"));
+ ASSERT_NE(schema, nullptr);
+
+ // import back to arrow::Field to verify
+ auto field_result = arrow::ImportField(schema.get());
+ ASSERT_TRUE(field_result.ok());
+ auto field = field_result.ValueUnsafe();
+
+ ASSERT_EQ(field->name(), "my_blob");
+ ASSERT_EQ(field->type()->id(), arrow::Type::LARGE_BINARY);
+ ASSERT_FALSE(field->nullable());
+ ASSERT_TRUE(field->HasMetadata());
+ auto extension_type = field->metadata()->Get("paimon.extension.type");
+ ASSERT_TRUE(extension_type.ok());
+ ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob");
+ }
+ {
+ // with custom metadata
+ std::unordered_map<std::string, std::string> custom_metadata = {
+ {"custom_key", "custom_value"}};
+ ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("meta_blob",
custom_metadata));
+ auto field = arrow::ImportField(schema.get()).ValueUnsafe();
+ ASSERT_EQ(field->name(), "meta_blob");
+ ASSERT_FALSE(field->nullable());
+ ASSERT_TRUE(field->HasMetadata());
+ // blob extension metadata should be present
+ auto extension_type = field->metadata()->Get("paimon.extension.type");
+ ASSERT_TRUE(extension_type.ok());
+ ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob");
+ // custom metadata should also be present
+ auto custom_val = field->metadata()->Get("custom_key");
+ ASSERT_TRUE(custom_val.ok());
+ ASSERT_EQ(custom_val.ValueUnsafe(), "custom_value");
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/blob_utils.cpp
b/src/paimon/common/data/blob_utils.cpp
new file mode 100644
index 0000000..9eb716a
--- /dev/null
+++ b/src/paimon/common/data/blob_utils.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_utils.h"
+
+#include <cstddef>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/type.h"
+#include "paimon/common/data/blob_defs.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/string_utils.h"
+
+namespace arrow {
+class Array;
+}
+
+namespace paimon {
+
+BlobUtils::SeparatedSchemas BlobUtils::SeparateBlobSchema(
+ const std::shared_ptr<arrow::Schema>& schema) {
+ std::vector<std::shared_ptr<arrow::Field>> remaining_fields;
+ std::vector<std::shared_ptr<arrow::Field>> blob_fields;
+ for (auto i = 0; i < schema->num_fields(); i++) {
+ auto field = schema->field(i);
+ if (IsBlobField(field)) {
+ blob_fields.emplace_back(field);
+ } else {
+ remaining_fields.emplace_back(field);
+ }
+ }
+ SeparatedSchemas result;
+ result.main_schema = arrow::schema(remaining_fields);
+ result.blob_schema = arrow::schema(blob_fields);
+ return result;
+}
+
+Result<BlobUtils::SeparatedStructArrays> BlobUtils::SeparateBlobArray(
+ const std::shared_ptr<arrow::StructArray>& struct_array) {
+ std::shared_ptr<arrow::StructType> old_type =
+ std::static_pointer_cast<arrow::StructType>(struct_array->type());
+ const auto& old_fields = old_type->fields();
+ const auto& old_arrays = struct_array->fields();
+
+ std::vector<std::shared_ptr<arrow::Field>> remaining_fields;
+ std::vector<std::shared_ptr<arrow::Array>> remaining_arrays;
+ std::vector<std::shared_ptr<arrow::Field>> blob_fields;
+ std::vector<std::shared_ptr<arrow::Array>> blob_arrays;
+
+ for (size_t i = 0; i < old_fields.size(); i++) {
+ if (IsBlobField(old_fields[i])) {
+ blob_fields.push_back(old_fields[i]);
+ blob_arrays.push_back(old_arrays[i]);
+ } else {
+ remaining_fields.push_back(old_fields[i]);
+ remaining_arrays.push_back(old_arrays[i]);
+ }
+ }
+
+ SeparatedStructArrays result;
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.main_array,
+
arrow::StructArray::Make(remaining_arrays, remaining_fields));
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.blob_array,
+ arrow::StructArray::Make(blob_arrays,
blob_fields));
+ return result;
+}
+
+bool BlobUtils::IsBlobField(const std::shared_ptr<arrow::Field>& field) {
+ const auto& type = field->type();
+ if (type->id() != arrow::Type::LARGE_BINARY) {
+ return false;
+ }
+ if (!field->HasMetadata()) {
+ return false;
+ }
+ return IsBlobMetadata(field->metadata());
+}
+
+bool BlobUtils::IsBlobMetadata(const std::shared_ptr<const
arrow::KeyValueMetadata>& metadata) {
+ if (!metadata) {
+ return false;
+ }
+ auto extension_name = metadata->Get(BlobDefs::kExtensionTypeKey);
+ if (!extension_name.ok()) {
+ return false;
+ }
+ return extension_name.ValueUnsafe() == BlobDefs::kExtensionTypeValue;
+}
+
+bool BlobUtils::IsBlobFile(const std::string& file_name) {
+ return StringUtils::EndsWith(file_name, ".blob");
+}
+
+std::shared_ptr<arrow::Field> BlobUtils::ToArrowField(
+ const std::string& field_name, bool nullable,
+ std::unordered_map<std::string, std::string> metadata) {
+ metadata[BlobDefs::kExtensionTypeKey] = BlobDefs::kExtensionTypeValue;
+ return arrow::field(field_name, arrow::large_binary(), nullable,
+ std::make_shared<arrow::KeyValueMetadata>(metadata));
+}
+} // namespace paimon
diff --git a/src/paimon/common/data/blob_utils.h
b/src/paimon/common/data/blob_utils.h
new file mode 100644
index 0000000..505cad5
--- /dev/null
+++ b/src/paimon/common/data/blob_utils.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace arrow {
+class Field;
+class KeyValueMetadata;
+class Schema;
+class StructArray;
+} // namespace arrow
+
+namespace paimon {
+/// Utils for blob type.
+class PAIMON_EXPORT BlobUtils {
+ public:
+ BlobUtils() = delete;
+ ~BlobUtils() = delete;
+
+ struct SeparatedSchemas {
+ /// Non-blob fields
+ std::shared_ptr<arrow::Schema> main_schema;
+ /// Blob fields only
+ std::shared_ptr<arrow::Schema> blob_schema;
+ };
+
+ struct SeparatedStructArrays {
+ /// Non-blob fields
+ std::shared_ptr<arrow::StructArray> main_array;
+ /// Blob fields only
+ std::shared_ptr<arrow::StructArray> blob_array;
+ };
+
+ static SeparatedSchemas SeparateBlobSchema(const
std::shared_ptr<arrow::Schema>& schema);
+
+ static Result<SeparatedStructArrays> SeparateBlobArray(
+ const std::shared_ptr<arrow::StructArray>& struct_array);
+
+ static bool IsBlobField(const std::shared_ptr<arrow::Field>& field);
+ static bool IsBlobMetadata(const std::shared_ptr<const
arrow::KeyValueMetadata>& metadata);
+ static bool IsBlobFile(const std::string& file_name);
+
+ static std::shared_ptr<arrow::Field> ToArrowField(
+ const std::string& field_name, bool nullable = false,
+ std::unordered_map<std::string, std::string> metadata = {});
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/data/blob_utils_test.cpp
b/src/paimon/common/data/blob_utils_test.cpp
new file mode 100644
index 0000000..d042ecd
--- /dev/null
+++ b/src/paimon/common/data/blob_utils_test.cpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/blob_utils.h"
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/blob_defs.h"
+#include "paimon/data/blob.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BlobUtilsTest : public ::testing::Test {
+ private:
+ std::shared_ptr<arrow::KeyValueMetadata> CreateBlobMetadata() {
+ std::unordered_map<std::string, std::string> blob_metadata_map = {
+ {BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}};
+ return std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map);
+ }
+};
+
+TEST_F(BlobUtilsTest, IsBlobMetadata) {
+ auto correct_metadata = CreateBlobMetadata();
+ EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
+ EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
+ std::unordered_map<std::string, std::string> wrong_metadata_map = {
+ {BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}};
+ auto wrong_metadata =
std::make_shared<arrow::KeyValueMetadata>(wrong_metadata_map);
+ EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
+ std::unordered_map<std::string, std::string> no_extension_metadata_map = {
+ {"other_key", BlobDefs::kExtensionTypeValue}};
+ auto no_extension_metadata =
+ std::make_shared<arrow::KeyValueMetadata>(no_extension_metadata_map);
+ EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
+}
+
+TEST_F(BlobUtilsTest, IsBlobField) {
+ std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("f1",
true);
+ EXPECT_TRUE(BlobUtils::IsBlobField(blob_field));
+
+ auto int_field = arrow::field("i_int", arrow::int32());
+ EXPECT_FALSE(BlobUtils::IsBlobField(int_field));
+
+ auto binary_field_no_meta = arrow::field("b_no_meta",
arrow::large_binary());
+ EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta));
+
+ auto wrong_meta = std::make_shared<arrow::KeyValueMetadata>(
+ std::unordered_map<std::string, std::string>{{"other_key", "value"}});
+ auto binary_field_wrong_meta =
+ arrow::field("b_wrong_meta", arrow::large_binary(), false, wrong_meta);
+ EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta));
+}
+
+TEST_F(BlobUtilsTest, SeparateBlobSchema) {
+ auto int_field = arrow::field("f1_int", arrow::int32());
+ auto string_field = arrow::field("f2_string", arrow::utf8());
+ std::shared_ptr<arrow::Field> blob_field_1 =
BlobUtils::ToArrowField("f3_blob_1", true);
+ {
+ std::shared_ptr<arrow::Schema> original_schema =
+ arrow::schema({int_field, string_field, blob_field_1});
+
+ BlobUtils::SeparatedSchemas schemas =
BlobUtils::SeparateBlobSchema(original_schema);
+
+ std::shared_ptr<arrow::Schema> expected_main_schema =
+ arrow::schema({int_field, string_field});
+ ASSERT_TRUE(schemas.main_schema->Equals(*expected_main_schema));
+
+ std::shared_ptr<arrow::Schema> expected_blob_schema =
arrow::schema({blob_field_1});
+ ASSERT_TRUE(schemas.blob_schema->Equals(*expected_blob_schema));
+ }
+ {
+ std::shared_ptr<arrow::Schema> no_blob_schema =
arrow::schema({int_field, string_field});
+ BlobUtils::SeparatedSchemas no_blob_schemas =
BlobUtils::SeparateBlobSchema(no_blob_schema);
+ ASSERT_TRUE(no_blob_schemas.main_schema->Equals(*no_blob_schema));
+ ASSERT_EQ(no_blob_schemas.blob_schema->num_fields(), 0);
+ }
+ {
+ std::shared_ptr<arrow::Schema> only_blob_schema =
arrow::schema({blob_field_1});
+ BlobUtils::SeparatedSchemas only_blob_schemas =
+ BlobUtils::SeparateBlobSchema(only_blob_schema);
+ ASSERT_TRUE(only_blob_schemas.blob_schema->Equals(*only_blob_schema));
+ ASSERT_EQ(only_blob_schemas.main_schema->num_fields(), 0);
+ }
+}
+
+TEST_F(BlobUtilsTest, SeparateBlobArray) {
+ auto int_field = arrow::field("f1_int", arrow::int32());
+ std::shared_ptr<arrow::Field> blob_field =
BlobUtils::ToArrowField("f2_blob", false);
+ auto string_field = arrow::field("f3_string", arrow::utf8());
+ auto schema = arrow::schema({int_field, blob_field, string_field});
+
+ arrow::Int32Builder int_builder;
+ ASSERT_TRUE(int_builder.AppendValues({1, 2, 3}).ok());
+ auto int_array = int_builder.Finish().ValueOrDie();
+
+ arrow::StringBuilder string_builder;
+ ASSERT_TRUE(string_builder.AppendValues({"a", "b", "c"}).ok());
+ auto string_array = string_builder.Finish().ValueOrDie();
+
+ arrow::LargeBinaryBuilder blob_builder;
+ ASSERT_TRUE(blob_builder.Append("1", 1).ok());
+ ASSERT_TRUE(blob_builder.Append("2", 1).ok());
+ ASSERT_TRUE(blob_builder.Append("3", 1).ok());
+ auto blob_array_data = blob_builder.Finish().ValueOrDie();
+
+ auto raw_struct_array =
+ arrow::StructArray::Make({int_array, blob_array_data, string_array},
schema->fields())
+ .ValueOrDie();
+
+ std::shared_ptr<arrow::StructArray> struct_array =
+ std::static_pointer_cast<arrow::StructArray>(raw_struct_array);
+
+ ASSERT_OK_AND_ASSIGN(auto separated,
BlobUtils::SeparateBlobArray(struct_array));
+
+ std::shared_ptr<arrow::DataType> expected_main_type =
arrow::struct_({int_field, string_field});
+ ASSERT_TRUE(separated.main_array->type()->Equals(*expected_main_type));
+ ASSERT_EQ(separated.main_array->num_fields(), 2);
+ ASSERT_TRUE(separated.main_array->field(0)->Equals(*int_array));
+ ASSERT_TRUE(separated.main_array->field(1)->Equals(*string_array));
+
+ std::shared_ptr<arrow::DataType> expected_blob_type =
arrow::struct_({blob_field});
+ ASSERT_TRUE(separated.blob_array->type()->Equals(*expected_blob_type));
+ ASSERT_EQ(separated.blob_array->num_fields(), 1);
+ ASSERT_TRUE(separated.blob_array->field(0)->Equals(*blob_array_data));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/decimal.cpp
b/src/paimon/common/data/decimal.cpp
new file mode 100644
index 0000000..c05982c
--- /dev/null
+++ b/src/paimon/common/data/decimal.cpp
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/decimal.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstdlib>
+
+#include "arrow/api.h"
+#include "arrow/scalar.h"
+#include "arrow/util/decimal.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+const int64_t Decimal::POWERS_OF_TEN[MAX_COMPACT_PRECISION + 1] = {1,
+ 10,
+ 100,
+ 1000,
+ 10000,
+ 100000,
+ 1000000l,
+ 10000000l,
+ 100000000l,
+ 1000000000l,
+
10000000000l,
+
100000000000l,
+
1000000000000l,
+
10000000000000l,
+
100000000000000l,
+
1000000000000000l,
+
10000000000000000l,
+
100000000000000000l,
+
1000000000000000000l};
+const Decimal::int128_t Decimal::INT128_MAXIMUM_VALUE =
+ static_cast<Decimal::int128_t>(0x7fffffffffffffff) << 64 |
0xffffffffffffffff;
+const Decimal::int128_t Decimal::INT128_MINIMUM_VALUE =
+ static_cast<Decimal::int128_t>(0x8000000000000000) << 64;
+
+std::string Decimal::ToString() const {
+ auto type = arrow::decimal128(Precision(), Scale());
+ arrow::Decimal128Scalar scalar(arrow::Decimal128(HighBits(), LowBits()),
type);
+ return scalar.ToString();
+}
+
+std::vector<char> Decimal::ToUnscaledBytes() const {
+ bool positive = value_ >= 0;
+ int32_t valid_bytes = 0;
+ if (positive) {
+ int32_t leading_zero_bytes = count_leading_zero_bytes(value_);
+ valid_bytes = sizeof(value_) - leading_zero_bytes;
+ } else {
+ int32_t leading_all_ones_bytes = count_leading_all_ones_bytes(value_);
+ valid_bytes = sizeof(value_) - leading_all_ones_bytes;
+ }
+ if (valid_bytes == 0) {
+ // if value_ == 0, return one byte with 0;
+ // if value_ == 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, return one byte
with 0xFF
+ valid_bytes = 1;
+ } else {
+ // java BigInteger use highest significant bit to determine if the
number is positive or
+ // negative, e.g., a positive BigInteger 0xFF, will return two bytes
[0x00, 0xFF]
+ bool highest_significant_bit =
+ (static_cast<Decimal::uint128_t>(value_) >> ((valid_bytes - 1) *
8)) & 0x80;
+ if ((positive && highest_significant_bit) || (!positive &&
!highest_significant_bit)) {
+ valid_bytes += 1;
+ }
+ }
+ std::vector<char> bytes(valid_bytes);
+ memcpy(bytes.data(), &value_, valid_bytes);
+ if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+ std::reverse(bytes.data(), bytes.data() + bytes.size());
+ }
+ return bytes;
+}
+
+Decimal Decimal::FromUnscaledBytes(int32_t precision, int32_t scale, Bytes*
bytes) {
+ if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+ std::reverse(bytes->data(), bytes->data() + bytes->size());
+ }
+ Decimal::int128_t value = 0;
+ for (size_t i = 0; i < bytes->size(); ++i) {
+ value |=
static_cast<Decimal::uint128_t>(static_cast<uint8_t>((*bytes)[i])) << (8 * i);
+ }
+ // for negative
+ if ((*bytes)[bytes->size() - 1] & 0x80) {
+ for (size_t i = bytes->size(); i < sizeof(Decimal::int128_t); ++i) {
+ value |= static_cast<Decimal::uint128_t>(0xFFull) << (8 * i);
+ }
+ }
+ return Decimal(precision, scale, value);
+}
+
+int32_t Decimal::clz_u128(uint128_t u) {
+ uint64_t hi = u >> 64;
+ uint64_t lo = u;
+ int32_t retval[3] = {__builtin_clzll(hi), __builtin_clzll(lo) + 64, 128};
+ int32_t idx = !hi + ((!lo) & (!hi));
+ return retval[idx];
+}
+
+int32_t Decimal::count_leading_zero_bytes(uint128_t u) {
+ if (u == 0) {
+ return sizeof(u);
+ }
+ int32_t leading_zeros = clz_u128(u);
+ return leading_zeros / 8;
+}
+
+int32_t Decimal::count_leading_all_ones_bytes(uint128_t u) {
+ if (u == 0) {
+ return 0;
+ }
+ int32_t count = 0;
+ for (int32_t i = sizeof(uint128_t) - 1; i >= 0; i--) {
+ if (((u >> (i * 8)) & 0xFF) == 0xFF) {
+ count++;
+ } else {
+ break;
+ }
+ }
+ return count;
+}
+
+Decimal::int128_t Decimal::DownScaleInt128(Decimal::int128_t value, int32_t
scale) {
+ while (scale > 0) {
+ int32_t step = std::min(std::abs(scale), MAX_COMPACT_PRECISION);
+ value /= POWERS_OF_TEN[step];
+ scale -= step;
+ }
+ return value;
+}
+
+Decimal::int128_t Decimal::ScaleInt128(Decimal::int128_t value, int32_t scale,
bool* overflow) {
+ *overflow = false;
+ while (scale > 0) {
+ int32_t step = std::min(scale, MAX_COMPACT_PRECISION);
+ if (value > 0 && INT128_MAXIMUM_VALUE / POWERS_OF_TEN[step] < value) {
+ *overflow = true;
+ return INT128_MAXIMUM_VALUE;
+ } else if (value < 0 && INT128_MINIMUM_VALUE / POWERS_OF_TEN[step] >
value) {
+ *overflow = true;
+ return INT128_MINIMUM_VALUE;
+ }
+
+ value *= POWERS_OF_TEN[step];
+ scale -= step;
+ }
+ return value;
+}
+
+int32_t Decimal::CompareTo(const Decimal& other) const {
+ auto l_value = value_;
+ auto l_scale = scale_;
+ auto r_value = other.value_;
+ auto r_scale = other.scale_;
+
+ bool l_positive = l_value >= 0;
+ bool r_positive = r_value >= 0;
+ if (l_positive && !r_positive) {
+ return 1;
+ } else if (!l_positive && r_positive) {
+ return -1;
+ }
+
+ // compare integral parts
+ Decimal::int128_t l_integral = DownScaleInt128(l_value, l_scale);
+ Decimal::int128_t r_integral = DownScaleInt128(r_value, r_scale);
+
+ if (l_integral < r_integral) {
+ return -1;
+ } else if (l_integral > r_integral) {
+ return 1;
+ }
+
+ // integral parts are equal, continue comparing fractional parts
+ // unnecessary to check overflow here because the scaled number will not
+ // exceed original ones
+ bool overflow = false, positive = l_value >= 0;
+ l_value -= ScaleInt128(l_integral, l_scale, &overflow);
+ r_value -= ScaleInt128(r_integral, r_scale, &overflow);
+
+ int32_t diff = l_scale - r_scale;
+ if (diff > 0) {
+ r_value = ScaleInt128(r_value, diff, &overflow);
+ if (overflow) {
+ return positive ? -1 : 1;
+ }
+ } else {
+ l_value = ScaleInt128(l_value, -diff, &overflow);
+ if (overflow) {
+ return positive ? 1 : -1;
+ }
+ }
+
+ if (l_value < r_value) {
+ return -1;
+ } else if (l_value > r_value) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/decimal_test.cpp
b/src/paimon/common/data/decimal_test.cpp
new file mode 100644
index 0000000..038b3d3
--- /dev/null
+++ b/src/paimon/common/data/decimal_test.cpp
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/decimal.h"
+
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DecimalTest, TestSimple) {
+ auto CheckResult = [](const Decimal& decimal, const std::vector<uint8_t>&
bytes) {
+ auto pool = GetDefaultPool();
+ // prepare java bytes
+ auto java_bytes = Bytes::AllocateBytes(bytes.size(), pool.get());
+ memcpy(java_bytes->data(), bytes.data(), bytes.size());
+ // prepare result cpp bytes
+ auto cpp_serialized_bytes = decimal.ToUnscaledBytes();
+
+ // check serialized bytes equal
+ ASSERT_EQ(std::vector<char>(java_bytes->data(), java_bytes->data() +
java_bytes->size()),
+ cpp_serialized_bytes);
+ // check deserialize equal
+ auto decimal2 = Decimal::FromUnscaledBytes(38, 38, java_bytes.get());
+ ASSERT_EQ(decimal, decimal2);
+ };
+ {
+ Decimal decimal(38, 38,
DecimalUtils::StrToInt128("12345678998765432145678").value());
+ std::vector<uint8_t> java_bytes = {0x2, 0x9d, 0x42, 0xb6, 0xa7, 0x2a,
0x9d, 0xc7, 0x7, 0xe};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38,
DecimalUtils::StrToInt128("-12345678998765432145678").value());
+ std::vector<uint8_t> java_bytes = {0xfd, 0x62, 0xbd, 0x49, 0x58,
+ 0xd5, 0x62, 0x38, 0xf8, 0xf2};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38, DecimalUtils::StrToInt128("0").value());
+ std::vector<uint8_t> java_bytes = {0x0};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38, DecimalUtils::StrToInt128("1").value());
+ std::vector<uint8_t> java_bytes = {0x1};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38, DecimalUtils::StrToInt128("-1").value());
+ std::vector<uint8_t> java_bytes = {0xff};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38, DecimalUtils::StrToInt128("128").value());
+ std::vector<uint8_t> java_bytes = {0x0, 0x80};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38, Decimal::INT128_MINIMUM_VALUE);
+ std::vector<uint8_t> java_bytes = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00};
+ CheckResult(decimal, java_bytes);
+ }
+ {
+ Decimal decimal(38, 38, Decimal::INT128_MAXIMUM_VALUE);
+ std::vector<uint8_t> java_bytes = {0x7f, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff,
+ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff};
+ CheckResult(decimal, java_bytes);
+ }
+}
+
+TEST(DecimalTest, TestCompatibleWithJava) {
+ auto pool = GetDefaultPool();
+ auto file_system = std::make_unique<LocalFileSystem>();
+ auto file_name = paimon::test::GetDataDir() + "/decimal_bytes.data";
+ uint64_t file_length =
file_system->GetFileStatus(file_name).value()->GetLen();
+ ASSERT_GT(file_length, 0);
+ ASSERT_OK_AND_ASSIGN(auto input_stream, file_system->Open(file_name));
+ auto data_bytes = Bytes::AllocateBytes(file_length, pool.get());
+ ASSERT_OK(input_stream->Read(data_bytes->data(), file_length));
+ auto byte_array_input_stream =
+ std::make_shared<ByteArrayInputStream>(data_bytes->data(),
file_length);
+ DataInputStream data_input_stream(byte_array_input_stream);
+ for (int32_t i = 0; i < 2000; i++) {
+ // read decimal str
+ ASSERT_OK_AND_ASSIGN(int32_t decimal_str_len,
data_input_stream.ReadValue<int32_t>());
+ auto decimal_str = Bytes::AllocateBytes(decimal_str_len, pool.get());
+ ASSERT_OK(data_input_stream.ReadBytes(decimal_str.get()));
+
+ // read decimal serialized bytes from java
+ ASSERT_OK_AND_ASSIGN(int32_t decimal_bytes_len,
data_input_stream.ReadValue<int32_t>());
+ auto decimal_bytes = Bytes::AllocateBytes(decimal_bytes_len,
pool.get());
+ ASSERT_OK(data_input_stream.ReadBytes(decimal_bytes.get()));
+
+ // check result
+ auto str = std::string(decimal_str->data(), decimal_str->size());
+ Decimal decimal(/*precision=*/29, /*scale=*/29,
DecimalUtils::StrToInt128(str).value());
+ auto serialized_bytes = decimal.ToUnscaledBytes();
+ ASSERT_EQ(
+ std::vector<char>(decimal_bytes->data(), decimal_bytes->data() +
decimal_bytes->size()),
+ serialized_bytes);
+ auto decimal2 =
+ Decimal::FromUnscaledBytes(/*precision=*/29, /*scale=*/29,
decimal_bytes.get());
+ ASSERT_EQ(decimal, decimal2);
+ }
+}
+
+TEST(DecimalTest, TestCompareTo) {
+ auto CheckResult = [](const Decimal& decimal1, const Decimal& decimal2) {
+ ASSERT_FALSE(decimal1 < decimal1);
+ ASSERT_FALSE(decimal1 > decimal1);
+ ASSERT_EQ(decimal1, decimal1);
+
+ ASSERT_EQ(decimal1.CompareTo(decimal2), -1);
+ ASSERT_LT(decimal1, decimal2);
+ ASSERT_EQ(decimal2.CompareTo(decimal1), 1);
+ ASSERT_GT(decimal2, decimal1);
+ auto decimal3 = decimal1;
+ ASSERT_EQ(decimal3.CompareTo(decimal1), 0);
+ ASSERT_EQ(decimal3, decimal1);
+
+ Decimal negative_decimal1(decimal1.Precision(), decimal1.Scale(),
-decimal1.Value());
+ Decimal negative_decimal2(decimal2.Precision(), decimal2.Scale(),
-decimal2.Value());
+ ASSERT_EQ(negative_decimal1.CompareTo(negative_decimal2), 1);
+ ASSERT_GT(negative_decimal1, negative_decimal2);
+ ASSERT_EQ(negative_decimal2.CompareTo(negative_decimal1), -1);
+ ASSERT_LT(negative_decimal2, negative_decimal1);
+ auto negative_decimal3 = negative_decimal1;
+ ASSERT_EQ(negative_decimal3.CompareTo(negative_decimal1), 0);
+ ASSERT_EQ(negative_decimal3, negative_decimal1);
+ };
+
+ auto CheckEqual = [](const Decimal& decimal1, const Decimal& decimal2) {
+ ASSERT_EQ(decimal1.CompareTo(decimal2), 0);
+ ASSERT_EQ(decimal2.CompareTo(decimal1), 0);
+
+ Decimal negative_decimal1(decimal1.Precision(), decimal1.Scale(),
-decimal1.Value());
+ Decimal negative_decimal2(decimal2.Precision(), decimal2.Scale(),
-decimal2.Value());
+ ASSERT_EQ(negative_decimal1.CompareTo(negative_decimal2), 0);
+ ASSERT_EQ(negative_decimal2.CompareTo(negative_decimal1), 0);
+ };
+
+ // same scales
+ {
+ Decimal decimal1(23, 0, DecimalUtils::StrToInt128("99").value());
+ Decimal decimal2(23, 0, DecimalUtils::StrToInt128("100").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 5, DecimalUtils::StrToInt128("34543").value());
+ Decimal decimal2(23, 5, DecimalUtils::StrToInt128("4324324").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 15,
DecimalUtils::StrToInt128("345345435432").value());
+ Decimal decimal2(23, 15,
DecimalUtils::StrToInt128("345344425435432").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 20, DecimalUtils::StrToInt128("5").value());
+ Decimal decimal2(23, 20, DecimalUtils::StrToInt128("50").value());
+ CheckResult(decimal1, decimal2);
+ }
+
+ // different scales
+ {
+ Decimal decimal1(23, 4, DecimalUtils::StrToInt128("10000").value());
+ Decimal decimal2(23, 3, DecimalUtils::StrToInt128("10000").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 2, DecimalUtils::StrToInt128("111").value());
+ Decimal decimal2(23, 3, DecimalUtils::StrToInt128("1111").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 5, DecimalUtils::StrToInt128("999999").value());
+ Decimal decimal2(23, 5, DecimalUtils::StrToInt128("9999999").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 1, DecimalUtils::StrToInt128("100").value());
+ Decimal decimal2(23, 0, DecimalUtils::StrToInt128("99").value());
+ CheckResult(decimal1, decimal2);
+ }
+
+ // same integral parts
+ {
+ Decimal decimal1(23, 0, DecimalUtils::StrToInt128("99999").value());
+ Decimal decimal2(23, 1, DecimalUtils::StrToInt128("999999").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 3, DecimalUtils::StrToInt128("12345123").value());
+ Decimal decimal2(23, 5,
DecimalUtils::StrToInt128("1234553432").value());
+ CheckResult(decimal1, decimal2);
+ }
+
+ // equal numbers
+ {
+ Decimal decimal1(23, 3, DecimalUtils::StrToInt128("100000").value());
+ Decimal decimal2(23, 0, DecimalUtils::StrToInt128("100").value());
+ CheckEqual(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 3, DecimalUtils::StrToInt128("100000").value());
+ Decimal decimal2(23, 3, DecimalUtils::StrToInt128("100000").value());
+ CheckEqual(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(23, 10, DecimalUtils::StrToInt128("1").value());
+ Decimal decimal2(23, 11, DecimalUtils::StrToInt128("10").value());
+ CheckEqual(decimal1, decimal2);
+ }
+
+ // large scales (>18)
+ {
+ Decimal decimal1(128, 35, DecimalUtils::StrToInt128("99").value());
+ Decimal decimal2(128, 35, DecimalUtils::StrToInt128("100").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(
+ 128, 29,
DecimalUtils::StrToInt128("12345678999999999999999999999999999998").value());
+ Decimal decimal2(
+ 128, 30,
DecimalUtils::StrToInt128("123456789999999999999999999999999999999").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(
+ 128, 30,
DecimalUtils::StrToInt128("123456789999999999999999999999999999900").value());
+ Decimal decimal2(
+ 128, 29,
DecimalUtils::StrToInt128("12345678999999999999999999999999999990").value());
+ CheckEqual(decimal1, decimal2);
+ }
+
+ // minimum and maximum
+ {
+ Decimal decimal1(128, 39, Decimal::INT128_MAXIMUM_VALUE);
+ Decimal decimal2(
+ 128, 39,
DecimalUtils::StrToInt128("170141183460469231731687303715884105727").value());
+ CheckEqual(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(128, 39, Decimal::INT128_MINIMUM_VALUE);
+ Decimal decimal2(
+ 128, 39,
DecimalUtils::StrToInt128("-170141183460469231731687303715884105728").value());
+ CheckEqual(decimal1, decimal2);
+ }
+
+ // fractional overflow
+ {
+ Decimal decimal1(128, 39, Decimal::INT128_MAXIMUM_VALUE);
+ Decimal decimal2(
+ 128, 38,
DecimalUtils::StrToInt128("99999999999999999999999999999999999999").value());
+ CheckResult(decimal1, decimal2);
+ }
+ {
+ Decimal decimal1(
+ 128, 38,
DecimalUtils::StrToInt128("-99999999999999999999999999999999999999").value());
+ Decimal decimal2(128, 39, Decimal::INT128_MINIMUM_VALUE);
+ CheckResult(decimal1, decimal2);
+ }
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/data/timestamp.cpp
b/src/paimon/common/data/timestamp.cpp
new file mode 100644
index 0000000..3f4a313
--- /dev/null
+++ b/src/paimon/common/data/timestamp.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/timestamp.h"
+
+#include <ctime>
+#include <iomanip>
+#include <sstream>
+
+namespace paimon {
+const int32_t Timestamp::DEFAULT_PRECISION = 6;
+const int32_t Timestamp::MILLIS_PRECISION = 3;
+const int32_t Timestamp::MAX_PRECISION = 9;
+const int32_t Timestamp::MIN_PRECISION = 0;
+const int32_t Timestamp::MAX_COMPACT_PRECISION = 3;
+
+int64_t Timestamp::ToMicrosecond() const {
+ int64_t MICROS_PER_MILLIS = 1000l;
+ int64_t NANOS_PER_MICROS = 1000l;
+ int64_t micro = millisecond_ * MICROS_PER_MILLIS;
+ return micro + nano_of_millisecond_ / NANOS_PER_MICROS;
+}
+
+std::string Timestamp::ToString() const {
+ time_t seconds = millisecond_ / 1000;
+ std::stringstream out;
+ int64_t ns = (millisecond_ % 1000) * 1000000l + nano_of_millisecond_;
+ if (ns < 0) {
+ seconds -= 1;
+ ns += 1000000000l;
+ }
+
+ std::tm tm_info;
+ ::gmtime_r(&seconds, &tm_info);
+ out << std::put_time(&tm_info, "%Y-%m-%d %H:%M:%S") << '.' <<
std::setfill('0') << std::setw(9)
+ << ns;
+ // for year with less than 4 digits, year str is not 4 digits in
put_time(), e.g., year 1 is "1"
+ // rather than "0001"
+ static const int32_t MAX_STR_LENGTH = 29;
+ std::string ret = out.str();
+ if (ret.length() < MAX_STR_LENGTH) {
+ ret.insert(0, MAX_STR_LENGTH - ret.length(), '0');
+ }
+ return ret;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/timestamp_test.cpp
b/src/paimon/common/data/timestamp_test.cpp
new file mode 100644
index 0000000..886f087
--- /dev/null
+++ b/src/paimon/common/data/timestamp_test.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/timestamp.h"
+
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+class TimestampTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ timestamp_ = Timestamp(1622547800000, 123456); // 2021-06-01
11:43:20.000123456
+ }
+
+ Timestamp timestamp_;
+};
+
+TEST_F(TimestampTest, GetMillisecond) {
+ ASSERT_EQ(timestamp_.GetMillisecond(), 1622547800000);
+}
+
+TEST_F(TimestampTest, GetNanoOfMillisecond) {
+ ASSERT_EQ(timestamp_.GetNanoOfMillisecond(), 123456);
+}
+
+TEST_F(TimestampTest, ToNanosecond) {
+ ASSERT_EQ(timestamp_.ToNanosecond(), 1622547800000123456ll);
+}
+
+TEST_F(TimestampTest, FromEpochMillis) {
+ Timestamp ts1 = Timestamp::FromEpochMillis(1622547800000);
+ ASSERT_EQ(ts1.GetMillisecond(), 1622547800000);
+ ASSERT_EQ(ts1.GetNanoOfMillisecond(), 0);
+
+ Timestamp ts2 = Timestamp::FromEpochMillis(1622547800000, 123456);
+ ASSERT_EQ(ts2.GetMillisecond(), 1622547800000);
+ ASSERT_EQ(ts2.GetNanoOfMillisecond(), 123456);
+}
+
+TEST_F(TimestampTest, ToMillisTimestamp) {
+ Timestamp ts = timestamp_.ToMillisTimestamp();
+ ASSERT_EQ(ts.GetMillisecond(), 1622547800000);
+ ASSERT_EQ(ts.GetNanoOfMillisecond(), 0);
+}
+
+TEST_F(TimestampTest, IsCompact) {
+ ASSERT_TRUE(Timestamp::IsCompact(3));
+ ASSERT_FALSE(Timestamp::IsCompact(6));
+}
+
+TEST_F(TimestampTest, EqualityOperator) {
+ Timestamp ts1(1622547800000, 123456);
+ Timestamp ts2(1622547800000, 123456);
+ Timestamp ts3(1622547800000, 654321);
+ ASSERT_EQ(ts1, ts1);
+ ASSERT_EQ(ts1, ts2);
+ ASSERT_NE(ts1, ts3);
+}
+
+TEST_F(TimestampTest, LessThanOperator) {
+ Timestamp ts1(1622547800000, 123456);
+ Timestamp ts2(1622547800000, 654321);
+ Timestamp ts3(1622547800001, 123456);
+ ASSERT_LT(ts1, ts2);
+ ASSERT_LT(ts1, ts3);
+ ASSERT_FALSE(ts2 < ts1);
+ ASSERT_FALSE(ts3 < ts1);
+}
+
+TEST_F(TimestampTest, ToString) {
+ ASSERT_EQ(timestamp_.ToString(), "2021-06-01 11:43:20.000123456");
+ ASSERT_EQ(Timestamp(-1, 0).ToString(), "1969-12-31 23:59:59.999000000");
+ ASSERT_EQ(Timestamp(-62109569749000l, 0).ToString(), "0001-10-29
05:44:11.000000000");
+}
+
+TEST_F(TimestampTest, TestToMicrosecond) {
+ {
+ Timestamp ts(1622547800000, 123456);
+ ASSERT_EQ(1622547800000123l, ts.ToMicrosecond());
+ }
+ {
+ Timestamp ts(-16225478, 123456);
+ ASSERT_EQ(-16225477877, ts.ToMicrosecond());
+ }
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/logging/logging.cpp
b/src/paimon/common/logging/logging.cpp
new file mode 100644
index 0000000..5e8a3f1
--- /dev/null
+++ b/src/paimon/common/logging/logging.cpp
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/logging.h"
+
+#include <cerrno>
+#include <cstdarg>
+#include <mutex>
+#include <optional>
+#include <shared_mutex>
+
+#include "glog/log_severity.h"
+#include "glog/logging.h"
+#include "glog/raw_logging.h"
+
+namespace paimon {
+
+static std::optional<Logger::LoggerCreator>& getLoggerCreator() {
+ static std::optional<Logger::LoggerCreator> _loggerCreator;
+ return _loggerCreator;
+}
+
+static std::shared_mutex& getRegistryLock() {
+ static std::shared_mutex registryMutex;
+ return registryMutex;
+}
+
+void Logger::RegisterLogger(LoggerCreator creator) {
+ std::unique_lock<std::shared_mutex> lock(getRegistryLock());
+ getLoggerCreator() = creator;
+}
+
+static google::LogSeverity ToGlogLevel(PaimonLogLevel level) {
+ switch (level) {
+ case PAIMON_LOG_LEVEL_DEBUG:
+ return google::GLOG_INFO;
+ case PAIMON_LOG_LEVEL_INFO:
+ return google::GLOG_INFO;
+ case PAIMON_LOG_LEVEL_WARN:
+ return google::GLOG_WARNING;
+ case PAIMON_LOG_LEVEL_ERROR:
+ return google::GLOG_ERROR;
+ case PAIMON_LOG_LEVEL_NONE:
+ case PAIMON_LOG_LEVEL_MAX:
+ default:
+ return google::GLOG_INFO;
+ }
+}
+
+class GlogAdaptor : public Logger {
+ public:
+ void LogV(PaimonLogLevel level, const char* fname, int lineno, const char*
function,
+ const char* fmt, ...) override {
+ va_list args;
+ va_start(args, fmt);
+ google::RawLog__(ToGlogLevel(level), fname, lineno, fmt, args);
+ va_end(args);
+ }
+
+ bool IsLevelEnabled(PaimonLogLevel level) const override {
+ return true;
+ }
+};
+
+std::unique_ptr<Logger> Logger::GetLogger(const std::string& path) {
+ auto& creator = getLoggerCreator();
+ if (creator) {
+ std::shared_lock<std::shared_mutex> lock(getRegistryLock());
+ return creator.value()(path);
+ }
+ std::unique_lock<std::shared_mutex> ulock(getRegistryLock());
+ if (!google::IsGoogleLoggingInitialized()) {
+ google::InitGoogleLogging(program_invocation_name);
+ }
+ return std::make_unique<GlogAdaptor>();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/logging/logging_test.cpp
b/src/paimon/common/logging/logging_test.cpp
new file mode 100644
index 0000000..8b6e3f6
--- /dev/null
+++ b/src/paimon/common/logging/logging_test.cpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/logging.h"
+
+#include <future>
+#include <random>
+#include <thread>
+
+#include "paimon/common/executor/future.h"
+#include "paimon/executor.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(LoggerTest, TestMultiThreadGetLogger) {
+ auto executor = CreateDefaultExecutor(/*thread_count=*/4);
+ auto get_logger = []() {
+ auto logger = Logger::GetLogger("my_log");
+ ASSERT_TRUE(logger);
+ };
+
+ std::vector<std::future<void>> futures;
+ for (int32_t i = 0; i < 1000; ++i) {
+ futures.push_back(Via(executor.get(), get_logger));
+ }
+ Wait(futures);
+}
+} // namespace paimon::test