This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3adf7b3 feat: add deletion vector support with bitmap implementation
and index file I/O (#69)
3adf7b3 is described below
commit 3adf7b3ba197faec3e2617fa396f0796241840b3
Author: Yonghao Fang <[email protected]>
AuthorDate: Wed Jun 10 10:48:37 2026 +0800
feat: add deletion vector support with bitmap implementation and index file
I/O (#69)
---
.../apply_deletion_vector_batch_reader.h | 110 ++++++++++
.../apply_deletion_vector_batch_reader_test.cpp | 169 +++++++++++++++
.../deletionvectors/bitmap64_deletion_vector.h | 30 +++
.../deletionvectors/bitmap_deletion_vector.cpp | 103 +++++++++
.../core/deletionvectors/bitmap_deletion_vector.h | 87 ++++++++
.../bitmap_deletion_vector_test.cpp | 239 +++++++++++++++++++++
.../core/deletionvectors/bucketed_dv_maintainer.h | 131 +++++++++++
.../bucketed_dv_maintainer_test.cpp | 156 ++++++++++++++
.../core/deletionvectors/deletion_file_writer.cpp | 67 ++++++
.../core/deletionvectors/deletion_file_writer.h | 68 ++++++
.../deletionvectors/deletion_file_writer_test.cpp | 121 +++++++++++
.../core/deletionvectors/deletion_vector.cpp | 135 ++++++++++++
src/paimon/core/deletionvectors/deletion_vector.h | 130 +++++++++++
.../deletion_vector_index_file_writer.cpp | 44 ++++
.../deletion_vector_index_file_writer.h | 53 +++++
.../deletion_vector_index_file_writer_test.cpp | 125 +++++++++++
.../core/deletionvectors/deletion_vector_test.cpp | 166 ++++++++++++++
.../deletion_vectors_index_file.cpp | 90 ++++++++
.../deletionvectors/deletion_vectors_index_file.h | 68 ++++++
.../deletion_vectors_index_file_test.cpp | 190 ++++++++++++++++
20 files changed, 2282 insertions(+)
diff --git
a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h
b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h
new file mode 100644
index 0000000..ed20c57
--- /dev/null
+++ b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+class Metrics;
+
+class ApplyDeletionVectorBatchReader : public FileBatchReader {
+ public:
+ ApplyDeletionVectorBatchReader(std::unique_ptr<FileBatchReader>&& reader,
+ const std::shared_ptr<DeletionVector>&
deletion_vector)
+ : reader_(std::move(reader)), deletion_vector_(deletion_vector) {
+ assert(reader_);
+ }
+
+ Result<ReadBatch> NextBatch() override {
+ return Status::Invalid(
+ "paimon inner reader ApplyDeletionVectorBatchReader should use
NextBatchWithBitmap");
+ }
+
+ Result<ReadBatchWithBitmap> NextBatchWithBitmap() override {
+ while (true) {
+ PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap batch_with_bitmap,
+ reader_->NextBatchWithBitmap());
+ if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+ return batch_with_bitmap;
+ }
+ auto& [batch, bitmap] = batch_with_bitmap;
+ PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 valid_bitmap,
Filter(batch.first->length));
+ bitmap &= valid_bitmap;
+ if (bitmap.IsEmpty()) {
+ ReaderUtils::ReleaseReadBatch(std::move(batch));
+ continue;
+ }
+ return batch_with_bitmap;
+ }
+ }
+
+ void Close() override {
+ return reader_->Close();
+ }
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override {
+ return reader_->GetReaderMetrics();
+ }
+
+ Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override {
+ return reader_->GetFileSchema();
+ }
+
+ Status SetReadSchema(::ArrowSchema* read_schema, const
std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>&
selection_bitmap) override {
+ return Status::Invalid("ApplyDeletionVectorBatchReader does not
support SetReadSchema");
+ }
+
+ Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
+ return reader_->GetPreviousBatchFirstRowNumber();
+ }
+
+ Result<uint64_t> GetNumberOfRows() const override {
+ return reader_->GetNumberOfRows();
+ }
+
+ bool SupportPreciseBitmapSelection() const override {
+ return reader_->SupportPreciseBitmapSelection();
+ }
+
+ private:
+ Result<RoaringBitmap32> Filter(int32_t batch_size) const {
+ PAIMON_ASSIGN_OR_RAISE(uint64_t previous_batch_first_row_number,
+ reader_->GetPreviousBatchFirstRowNumber());
+ return deletion_vector_->IsValid(previous_batch_first_row_number,
batch_size);
+ }
+
+ private:
+ std::unique_ptr<FileBatchReader> reader_;
+ std::shared_ptr<DeletionVector> deletion_vector_;
+};
+} // namespace paimon
diff --git
a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
new file mode 100644
index 0000000..2892022
--- /dev/null
+++
b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h"
+
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+#include "paimon/executor.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/mock/mock_format_reader_builder.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+namespace paimon {
+class FileSystem;
+} // namespace paimon
+
+namespace paimon::test {
+class ApplyDeletionVectorBatchReaderTest : public ::testing::Test,
+ public
::testing::WithParamInterface<bool> {
+ public:
+ void SetUp() override {
+ int_type_ = arrow::int32();
+ target_type_ = arrow::struct_({arrow::field("f1", int_type_)});
+
+ pool_ = GetDefaultPool();
+ fs_ = std::make_shared<MockFileSystem>();
+ executor_ = CreateDefaultExecutor(/*thread_count=*/2);
+ }
+ void TearDown() override {}
+
+ void CheckResult(BatchReader* apply_dv_batch_reader,
+ const std::shared_ptr<arrow::ChunkedArray>&
expected_chunk_array) {
+ ASSERT_OK_AND_ASSIGN(auto result_chunk_array,
+
ReadResultCollector::CollectResult(apply_dv_batch_reader));
+ if (expected_chunk_array) {
+ ASSERT_EQ(expected_chunk_array->length(),
result_chunk_array->length());
+ ASSERT_TRUE(expected_chunk_array->Equals(result_chunk_array));
+ } else {
+ ASSERT_FALSE(result_chunk_array);
+ }
+ }
+
+ void CheckResult(const std::string& data_str, const std::vector<char>&
dv_data,
+ const std::string& expected_str) {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(int_type_,
data_str).ValueOrDie();
+ std::shared_ptr<arrow::Array> data =
+ arrow::StructArray::Make({f1},
target_type_->fields()).ValueOrDie();
+
+ int32_t prefetch_batch_count = 3;
+ for (int32_t batch_size : {1, 2, 4, 10}) {
+ auto dv = DeletionVector::FromPrimitiveArray(dv_data, pool_.get());
+ std::unique_ptr<FileBatchReader> file_batch_reader;
+ bool enable_prefetch = GetParam();
+ if (enable_prefetch) {
+ MockFormatReaderBuilder reader_builder(data, target_type_,
batch_size);
+ ASSERT_OK_AND_ASSIGN(
+ file_batch_reader,
+ PrefetchFileBatchReaderImpl::Create(
+ /*data_file_path=*/"DUMMY", &reader_builder, fs_,
prefetch_batch_count,
+ batch_size, prefetch_batch_count * 2,
+ /*enable_adaptive_prefetch_strategy=*/false, executor_,
+ /*initialize_read_ranges=*/true,
+ /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), pool_));
+ } else {
+ file_batch_reader =
+ std::make_unique<MockFileBatchReader>(data, target_type_,
batch_size);
+ }
+ auto apply_dv_batch_reader =
std::make_unique<ApplyDeletionVectorBatchReader>(
+ std::move(file_batch_reader), std::move(dv));
+ if (expected_str.empty()) {
+ CheckResult(apply_dv_batch_reader.get(), nullptr);
+ } else {
+ auto expected =
+ arrow::ipc::internal::json::ArrayFromJSON(int_type_,
expected_str).ValueOrDie();
+ std::shared_ptr<arrow::Array> expect_array =
+ arrow::StructArray::Make({expected},
target_type_->fields()).ValueOrDie();
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expect_array);
+ CheckResult(apply_dv_batch_reader.get(), expected_chunk_array);
+ }
+ auto read_metrics = apply_dv_batch_reader->GetReaderMetrics();
+ ASSERT_TRUE(read_metrics);
+ apply_dv_batch_reader->Close();
+ }
+ }
+
+ private:
+ std::shared_ptr<arrow::DataType> int_type_;
+ std::shared_ptr<arrow::DataType> target_type_;
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<Executor> executor_;
+};
+
+TEST_P(ApplyDeletionVectorBatchReaderTest, TestSimple) {
+ std::string data_str = "[10, 11, 12, 13]";
+ {
+ std::vector<char> dv_data = {0, 1, 1, 0};
+ CheckResult(data_str, dv_data, "[10, 13]");
+ }
+ {
+ std::vector<char> dv_data = {1, 0, 0, 1};
+ CheckResult(data_str, dv_data, "[11, 12]");
+ }
+ {
+ std::vector<char> dv_data = {1, 1, 1, 1};
+ // empty result
+ CheckResult(data_str, dv_data, "");
+ }
+ {
+ std::vector<char> dv_data = {0, 0, 0, 0};
+ CheckResult(data_str, dv_data, "[10, 11, 12, 13]");
+ }
+}
+
+TEST_P(ApplyDeletionVectorBatchReaderTest, TestSimple2) {
+ std::string data_str = "[10, 11, 12, 13, 14, 15, 16]";
+ {
+ std::vector<char> dv_data = {0, 1, 1, 0, 1, 0, 0};
+ CheckResult(data_str, dv_data, "[10, 13, 15, 16]");
+ }
+ {
+ std::vector<char> dv_data = {0, 1, 1, 0, 0, 0, 1};
+ CheckResult(data_str, dv_data, "[10, 13, 14, 15]");
+ }
+ {
+ std::vector<char> dv_data = {1, 1, 1, 1, 0, 0, 1};
+ CheckResult(data_str, dv_data, "[14, 15]");
+ }
+
+ {
+ std::vector<char> dv_data = {1, 1, 1, 1, 1, 1, 1};
+ // empty result
+ CheckResult(data_str, dv_data, "");
+ }
+ {
+ std::vector<char> dv_data = {0, 0, 0, 0, 0, 0, 0};
+ CheckResult(data_str, dv_data, "[10, 11, 12, 13, 14, 15, 16]");
+ }
+}
+INSTANTIATE_TEST_SUITE_P(EnablePrefetch, ApplyDeletionVectorBatchReaderTest,
+ ::testing::Values(false, true));
+} // namespace paimon::test
diff --git a/src/paimon/core/deletionvectors/bitmap64_deletion_vector.h
b/src/paimon/core/deletionvectors/bitmap64_deletion_vector.h
new file mode 100644
index 0000000..7ec827d
--- /dev/null
+++ b/src/paimon/core/deletionvectors/bitmap64_deletion_vector.h
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include "paimon/core/deletionvectors/deletion_vector.h"
+
+namespace paimon {
+
+class Bitmap64DeletionVector : public DeletionVector {
+ public:
+ static constexpr int32_t MAGIC_NUMBER = 1681511377;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp
b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp
new file mode 100644
index 0000000..533d4ac
--- /dev/null
+++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+
+#include "arrow/util/crc32.h"
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+
+namespace paimon {
+
+Result<int32_t> BitmapDeletionVector::SerializeTo(const
std::shared_ptr<MemoryPool>& pool,
+ DataOutputStream* out) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> data,
SerializeToBytes(pool));
+ int64_t size = data->size();
+ if (size < 0 || size > std::numeric_limits<int32_t>::max()) {
+ return Status::Invalid("BitmapDeletionVector serialize size out of
range: ", size);
+ }
+ PAIMON_RETURN_NOT_OK(out->WriteValue<int32_t>(static_cast<int32_t>(size)));
+ PAIMON_RETURN_NOT_OK(out->WriteBytes(data));
+ uint32_t crc32 = 0;
+ crc32 = arrow::internal::crc32(crc32, data->data(), size);
+
PAIMON_RETURN_NOT_OK(out->WriteValue<int32_t>(static_cast<int32_t>(crc32)));
+ return static_cast<int32_t>(size);
+}
+
+Result<PAIMON_UNIQUE_PTR<Bytes>> BitmapDeletionVector::SerializeToBytes(
+ const std::shared_ptr<MemoryPool>& pool) {
+ std::shared_ptr<Bytes> bitmap_bytes =
roaring_bitmap_.Serialize(pool.get());
+ if (bitmap_bytes == nullptr) {
+ assert(bitmap_bytes);
+ return Status::Invalid("roaring bitmap serialize failed");
+ }
+ MemorySegmentOutputStream
output(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
+ output.WriteValue<int32_t>(MAGIC_NUMBER);
+ output.WriteBytes(bitmap_bytes);
+ return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0,
output.CurrentSize(),
+ pool.get());
+}
+
+Status BitmapDeletionVector::CheckPosition(int64_t position) const {
+ if (position > RoaringBitmap32::MAX_VALUE) {
+ return Status::Invalid(fmt::format(
+ "The file has too many rows, RoaringBitmap32 only supports files
with row count "
+ "not exceeding {}.",
+ RoaringBitmap32::MAX_VALUE));
+ }
+ return Status::OK();
+}
+
+Result<PAIMON_UNIQUE_PTR<DeletionVector>>
BitmapDeletionVector::DeserializeWithoutMagicNumber(
+ const char* buffer, int32_t length, MemoryPool* pool) {
+ RoaringBitmap32 roaring_bitmap;
+ PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer, length));
+ return pool->AllocateUnique<BitmapDeletionVector>(roaring_bitmap);
+}
+
+Result<PAIMON_UNIQUE_PTR<DeletionVector>>
BitmapDeletionVector::Deserialize(const char* buffer,
+
int32_t length,
+
MemoryPool* pool) {
+ auto in = std::make_shared<ByteArrayInputStream>(buffer, length);
+ DataInputStream input(in);
+ PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue<int32_t>());
+ if (magic_num != MAGIC_NUMBER) {
+ return Status::Invalid(fmt::format(
+ "Unable to deserialize deletion vector, invalid magic number: {}",
magic_num));
+ }
+ return DeserializeWithoutMagicNumber(buffer + MAGIC_NUMBER_SIZE_BYTES,
+ length - MAGIC_NUMBER_SIZE_BYTES,
pool);
+}
+
+Status BitmapDeletionVector::Merge(const std::shared_ptr<DeletionVector>&
deletion_vector) {
+ if (!deletion_vector || deletion_vector->IsEmpty()) {
+ return Status::OK();
+ }
+ auto* other = dynamic_cast<BitmapDeletionVector*>(deletion_vector.get());
+ if (other != nullptr) {
+ roaring_bitmap_ |= other->roaring_bitmap_;
+ } else {
+ return Status::Invalid(
+ "Cannot merge a non-BitmapDeletionVector into a
BitmapDeletionVector");
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.h
b/src/paimon/core/deletionvectors/bitmap_deletion_vector.h
new file mode 100644
index 0000000..dd32179
--- /dev/null
+++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/io/data_output_stream.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+
+/// A `DeletionVector` based on `RoaringBitmap32`, it only supports files with
row count
+/// not exceeding `RoaringBitmap32::MAX_VALUE`.
+class BitmapDeletionVector : public DeletionVector {
+ public:
+ static constexpr int32_t MAGIC_NUMBER = 1581511376;
+ static constexpr int32_t MAGIC_NUMBER_SIZE_BYTES = 4;
+
+ explicit BitmapDeletionVector(const RoaringBitmap32& roaring_bitmap)
+ : roaring_bitmap_(roaring_bitmap) {}
+
+ Status Delete(int64_t position) override {
+ PAIMON_RETURN_NOT_OK(CheckPosition(position));
+ roaring_bitmap_.Add(static_cast<int32_t>(position));
+ return Status::OK();
+ }
+
+ Result<bool> CheckedDelete(int64_t position) override {
+ PAIMON_RETURN_NOT_OK(CheckPosition(position));
+ return roaring_bitmap_.CheckedAdd(static_cast<int32_t>(position));
+ }
+
+ Result<bool> IsDeleted(int64_t position) const override {
+ PAIMON_RETURN_NOT_OK(CheckPosition(position));
+ return roaring_bitmap_.Contains(static_cast<int32_t>(position));
+ }
+
+ bool IsEmpty() const override {
+ return roaring_bitmap_.IsEmpty();
+ }
+
+ int64_t GetCardinality() const override {
+ return roaring_bitmap_.Cardinality();
+ }
+
+ Result<int32_t> SerializeTo(const std::shared_ptr<MemoryPool>& pool,
+ DataOutputStream* out) override;
+
+ Result<PAIMON_UNIQUE_PTR<Bytes>> SerializeToBytes(
+ const std::shared_ptr<MemoryPool>& pool) override;
+
+ Status Merge(const std::shared_ptr<DeletionVector>& deletion_vector)
override;
+
+ const RoaringBitmap32* GetBitmap() const {
+ return &roaring_bitmap_;
+ }
+
+ static Result<PAIMON_UNIQUE_PTR<DeletionVector>> Deserialize(const char*
buffer, int32_t length,
+ MemoryPool*
pool);
+
+ static Result<PAIMON_UNIQUE_PTR<DeletionVector>>
DeserializeWithoutMagicNumber(
+ const char* buffer, int32_t length, MemoryPool* pool);
+
+ private:
+ Status CheckPosition(int64_t position) const;
+
+ RoaringBitmap32 roaring_bitmap_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp
b/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp
new file mode 100644
index 0000000..ed44586
--- /dev/null
+++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+
+#include <set>
+
+#include "fmt/format.h"
+#include "gtest/gtest.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(BitmapDeletionVectorTest, BasicOperations) {
+ RoaringBitmap32 roaring;
+ BitmapDeletionVector dv(roaring);
+ ASSERT_TRUE(dv.IsEmpty());
+ for (int32_t i = 0; i < 2000; i += 2) {
+ ASSERT_OK(dv.Delete(i));
+ }
+ ASSERT_EQ(dv.GetCardinality(), 1000);
+ for (int32_t i = 0; i < 2000; ++i) {
+ if (i % 2 == 0) {
+ ASSERT_TRUE(dv.IsDeleted(i).value());
+ } else {
+ ASSERT_FALSE(dv.IsDeleted(i).value());
+ }
+ }
+}
+
+TEST(BitmapDeletionVectorTest, CheckedDelete) {
+ RoaringBitmap32 roaring;
+ BitmapDeletionVector dv(roaring);
+ ASSERT_TRUE(dv.CheckedDelete(42).value());
+ ASSERT_FALSE(dv.CheckedDelete(42).value());
+ ASSERT_TRUE(dv.IsDeleted(42).value());
+}
+
+TEST(BitmapDeletionVectorTest, SerializeAndDeserialize) {
+ RoaringBitmap32 roaring;
+ for (int32_t i = 0; i < 100; i += 3) {
+ roaring.Add(i);
+ }
+ BitmapDeletionVector dv(roaring);
+ auto pool = GetDefaultPool();
+ ASSERT_OK_AND_ASSIGN(auto bytes, dv.SerializeToBytes(pool));
+ ASSERT_OK_AND_ASSIGN(
+ auto dv2, BitmapDeletionVector::Deserialize(bytes->data(),
bytes->size(), pool.get()));
+ for (int32_t i = 0; i < 100; ++i) {
+ ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value());
+ }
+}
+
+TEST(BitmapDeletionVectorTest, SerializeToOutputStream) {
+ RoaringBitmap32 roaring;
+ for (int32_t i = 0; i < 50; i += 2) {
+ roaring.Add(i);
+ }
+ BitmapDeletionVector dv(roaring);
+ auto pool = GetDefaultPool();
+ auto dir = UniqueTestDirectory::Create();
+ auto path = PathUtil::JoinPath(dir->Str(), "dv");
+ ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", path, {}));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream,
+ fs->Create(path, /*overwrite=*/false));
+ DataOutputStream out(output_stream);
+ ASSERT_OK_AND_ASSIGN(int64_t size, dv.SerializeTo(pool, &out));
+ ASSERT_OK(output_stream->Flush());
+ ASSERT_OK(output_stream->Close());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
fs->Open(path));
+ DataInputStream in(input_stream);
+ ASSERT_OK_AND_ASSIGN(int32_t byte_len, in.ReadValue<int32_t>());
+ auto bytes = Bytes::AllocateBytes(byte_len, pool.get());
+ ASSERT_OK(in.Read(bytes->data(), bytes->size()));
+ const char* data = bytes->data();
+ ASSERT_EQ(bytes->size(), size);
+ ASSERT_OK_AND_ASSIGN(auto dv2,
+ BitmapDeletionVector::Deserialize(data,
bytes->size(), pool.get()));
+ for (int32_t i = 0; i < 50; ++i) {
+ ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value());
+ }
+}
+
+TEST(BitmapDeletionVectorTest, GetCardinality) {
+ RoaringBitmap32 empty;
+ BitmapDeletionVector dv_empty(empty);
+ ASSERT_EQ(dv_empty.GetCardinality(), 0);
+
+ RoaringBitmap32 cont;
+ for (int32_t i = 0; i < 100; ++i) cont.Add(i);
+ BitmapDeletionVector dv_cont(cont);
+ ASSERT_EQ(dv_cont.GetCardinality(), 100);
+
+ RoaringBitmap32 gap;
+ for (int32_t i = 0; i < 1000; i += 10) gap.Add(i);
+ BitmapDeletionVector dv_gap(gap);
+ ASSERT_EQ(dv_gap.GetCardinality(), 100);
+
+ RoaringBitmap32 del;
+ for (int32_t i = 0; i < 10; ++i) del.Add(i);
+ BitmapDeletionVector dv_del(del);
+ ASSERT_EQ(dv_del.GetCardinality(), 10);
+ ASSERT_OK(dv_del.Delete(100));
+ ASSERT_EQ(dv_del.GetCardinality(), 11);
+}
+
+TEST(BitmapDeletionVectorTest, PositionOutOfRangeShouldFail) {
+ RoaringBitmap32 roaring;
+ BitmapDeletionVector dv(roaring);
+ int64_t invalid_position =
static_cast<int64_t>(RoaringBitmap32::MAX_VALUE) + 1;
+
+ ASSERT_NOK_WITH_MSG(dv.Delete(invalid_position), "too many rows");
+ ASSERT_NOK_WITH_MSG(dv.CheckedDelete(invalid_position), "too many rows");
+ ASSERT_NOK_WITH_MSG(dv.IsDeleted(invalid_position), "too many rows");
+}
+
+TEST(BitmapDeletionVectorTest, DeserializeShouldRejectInvalidMagicNumber) {
+ auto pool = GetDefaultPool();
+ MemorySegmentOutputStream
output(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
+ output.WriteValue<int32_t>(BitmapDeletionVector::MAGIC_NUMBER + 1);
+
+ auto payload = MemorySegmentUtils::CopyToBytes(output.Segments(),
/*offset=*/0,
+ output.CurrentSize(),
pool.get());
+
+ ASSERT_NOK_WITH_MSG(
+ BitmapDeletionVector::Deserialize(payload->data(), payload->size(),
pool.get()),
+ fmt::format("invalid magic number: {}",
BitmapDeletionVector::MAGIC_NUMBER + 1));
+}
+
+TEST(BitmapDeletionVectorTest, DeserializeWithoutMagicNumberShouldRoundTrip) {
+ auto pool = GetDefaultPool();
+
+ RoaringBitmap32 roaring;
+ roaring.Add(1);
+ roaring.Add(7);
+ roaring.Add(1024);
+ BitmapDeletionVector dv(roaring);
+
+ ASSERT_OK_AND_ASSIGN(auto bytes_with_magic, dv.SerializeToBytes(pool));
+ ASSERT_OK_AND_ASSIGN(
+ auto deserialized,
+ BitmapDeletionVector::DeserializeWithoutMagicNumber(
+ bytes_with_magic->data() +
BitmapDeletionVector::MAGIC_NUMBER_SIZE_BYTES,
+ bytes_with_magic->size() -
BitmapDeletionVector::MAGIC_NUMBER_SIZE_BYTES, pool.get()));
+
+ ASSERT_TRUE(deserialized->IsDeleted(1).value());
+ ASSERT_TRUE(deserialized->IsDeleted(7).value());
+ ASSERT_TRUE(deserialized->IsDeleted(1024).value());
+ ASSERT_FALSE(deserialized->IsDeleted(8).value());
+}
+
+TEST(BitmapDeletionVectorTest, MergeTwoBitmapDeletionVectors) {
+ RoaringBitmap32 roaring1;
+ roaring1.Add(1);
+ roaring1.Add(3);
+ roaring1.Add(5);
+ auto dv1 = std::make_shared<BitmapDeletionVector>(roaring1);
+
+ RoaringBitmap32 roaring2;
+ roaring2.Add(2);
+ roaring2.Add(4);
+ roaring2.Add(5); // overlapping position
+ auto dv2 = std::make_shared<BitmapDeletionVector>(roaring2);
+
+ ASSERT_OK(dv1->Merge(dv2));
+
+ // All positions from both vectors should be marked as deleted.
+ ASSERT_TRUE(dv1->IsDeleted(1).value());
+ ASSERT_TRUE(dv1->IsDeleted(2).value());
+ ASSERT_TRUE(dv1->IsDeleted(3).value());
+ ASSERT_TRUE(dv1->IsDeleted(4).value());
+ ASSERT_TRUE(dv1->IsDeleted(5).value());
+ ASSERT_FALSE(dv1->IsDeleted(0).value());
+ ASSERT_FALSE(dv1->IsDeleted(6).value());
+ ASSERT_EQ(dv1->GetCardinality(), 5);
+}
+
+TEST(BitmapDeletionVectorTest, MergeEmptyDeletionVector) {
+ RoaringBitmap32 roaring1;
+ roaring1.Add(10);
+ roaring1.Add(20);
+ auto dv1 = std::make_shared<BitmapDeletionVector>(roaring1);
+
+ RoaringBitmap32 empty_roaring;
+ auto dv_empty = std::make_shared<BitmapDeletionVector>(empty_roaring);
+
+ ASSERT_OK(dv1->Merge(dv_empty));
+ ASSERT_EQ(dv1->GetCardinality(), 2);
+ ASSERT_TRUE(dv1->IsDeleted(10).value());
+ ASSERT_TRUE(dv1->IsDeleted(20).value());
+}
+
+TEST(BitmapDeletionVectorTest, MergeNullDeletionVector) {
+ RoaringBitmap32 roaring1;
+ roaring1.Add(7);
+ auto dv1 = std::make_shared<BitmapDeletionVector>(roaring1);
+
+ ASSERT_OK(dv1->Merge(nullptr));
+ ASSERT_EQ(dv1->GetCardinality(), 1);
+ ASSERT_TRUE(dv1->IsDeleted(7).value());
+}
+
+TEST(BitmapDeletionVectorTest, MergeIntoEmptyDeletionVector) {
+ RoaringBitmap32 empty_roaring;
+ auto dv1 = std::make_shared<BitmapDeletionVector>(empty_roaring);
+ ASSERT_TRUE(dv1->IsEmpty());
+
+ RoaringBitmap32 roaring2;
+ roaring2.Add(100);
+ roaring2.Add(200);
+ roaring2.Add(300);
+ auto dv2 = std::make_shared<BitmapDeletionVector>(roaring2);
+
+ ASSERT_OK(dv1->Merge(dv2));
+ ASSERT_EQ(dv1->GetCardinality(), 3);
+ ASSERT_TRUE(dv1->IsDeleted(100).value());
+ ASSERT_TRUE(dv1->IsDeleted(200).value());
+ ASSERT_TRUE(dv1->IsDeleted(300).value());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/deletionvectors/bucketed_dv_maintainer.h
b/src/paimon/core/deletionvectors/bucketed_dv_maintainer.h
new file mode 100644
index 0000000..4088c34
--- /dev/null
+++ b/src/paimon/core/deletionvectors/bucketed_dv_maintainer.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
+#include "paimon/core/index/index_file_handler.h"
+#include "paimon/core/index/index_file_meta.h"
+
+namespace paimon {
+
+// Maintainer of deletionVectors index.
+class BucketedDvMaintainer {
+ public:
+ BucketedDvMaintainer(
+ const std::shared_ptr<DeletionVectorsIndexFile>& dv_index_file,
+ const std::map<std::string, std::shared_ptr<DeletionVector>>&
deletion_vectors)
+ : dv_index_file_(dv_index_file),
+ deletion_vectors_(deletion_vectors),
+ bitmap64_(dv_index_file->Bitmap64()) {}
+
+ Status NotifyNewDeletion(const std::string& file_name, int64_t position) {
+ std::shared_ptr<DeletionVector> dv;
+ if (auto it = deletion_vectors_.find(file_name); it ==
deletion_vectors_.end()) {
+ if (bitmap64_) {
+ return Status::NotImplemented("not support bitmap 64 deletion
vectors");
+ }
+ RoaringBitmap32 roaring_bitmap;
+ auto inserted =
std::make_shared<BitmapDeletionVector>(roaring_bitmap);
+ deletion_vectors_[file_name] = inserted;
+ dv = std::move(inserted);
+ } else {
+ dv = it->second;
+ }
+
+ PAIMON_ASSIGN_OR_RAISE(bool updated, dv->CheckedDelete(position));
+ if (updated) {
+ modified_ = true;
+ }
+ return Status::OK();
+ }
+
+ std::optional<std::shared_ptr<DeletionVector>> DeletionVectorOf(
+ const std::string& file_name) const {
+ if (auto it = deletion_vectors_.find(file_name); it !=
deletion_vectors_.end()) {
+ return it->second;
+ }
+ return std::nullopt;
+ }
+
+ void RemoveDeletionVectorOf(const std::string& file_name) {
+ if (deletion_vectors_.erase(file_name) > 0) {
+ modified_ = true;
+ }
+ }
+
+ Result<std::optional<std::shared_ptr<IndexFileMeta>>>
WriteDeletionVectorsIndex() {
+ if (modified_) {
+ modified_ = false;
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<IndexFileMeta> result,
+
dv_index_file_->WriteSingleFile(deletion_vectors_));
+ return std::make_optional<std::shared_ptr<IndexFileMeta>>(result);
+ }
+ return std::optional<std::shared_ptr<IndexFileMeta>>();
+ }
+
+ std::shared_ptr<DeletionVectorsIndexFile> DvIndexFile() const {
+ return dv_index_file_;
+ }
+
+ /// Factory to restore `BucketedDvMaintainer`.
+ class Factory {
+ public:
+ explicit Factory(const std::shared_ptr<IndexFileHandler>&
index_file_handler)
+ : handler_(index_file_handler) {}
+
+ std::shared_ptr<IndexFileHandler> GetIndexFileHandler() const {
+ return handler_;
+ }
+
+ Result<std::unique_ptr<BucketedDvMaintainer>> Create(
+ const BinaryRow& partition, int32_t bucket,
+ const std::vector<std::shared_ptr<IndexFileMeta>>& restored_files)
const {
+ std::map<std::string, std::shared_ptr<DeletionVector>>
deletion_vectors;
+ PAIMON_ASSIGN_OR_RAISE(deletion_vectors,
handler_->ReadAllDeletionVectors(
+ partition, bucket,
restored_files));
+ return Create(partition, bucket, deletion_vectors);
+ }
+
+ Result<std::unique_ptr<BucketedDvMaintainer>> Create(
+ const BinaryRow& partition, int32_t bucket,
+ const std::map<std::string, std::shared_ptr<DeletionVector>>&
deletion_vectors) const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DeletionVectorsIndexFile>
dv_index_file,
+ handler_->DvIndex(partition, bucket));
+ return std::make_unique<BucketedDvMaintainer>(dv_index_file,
deletion_vectors);
+ }
+
+ private:
+ std::shared_ptr<IndexFileHandler> handler_;
+ };
+
+ private:
+ std::shared_ptr<DeletionVectorsIndexFile> dv_index_file_;
+ std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors_;
+ bool bitmap64_;
+ bool modified_ = false;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/bucketed_dv_maintainer_test.cpp
b/src/paimon/core/deletionvectors/bucketed_dv_maintainer_test.cpp
new file mode 100644
index 0000000..f4ef95a
--- /dev/null
+++ b/src/paimon/core/deletionvectors/bucketed_dv_maintainer_test.cpp
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h"
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/mock/mock_index_path_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+namespace {
+
+std::shared_ptr<DeletionVectorsIndexFile> CreateDvIndexFile(const std::string&
root_path,
+ bool bitmap64 =
false) {
+ auto memory_pool = GetDefaultPool();
+ EXPECT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", root_path, {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(root_path);
+ return std::make_shared<DeletionVectorsIndexFile>(fs, path_factory,
bitmap64, memory_pool);
+}
+
+std::shared_ptr<DeletionVector> CreateDeletionVector(int32_t begin, int32_t
end) {
+ RoaringBitmap32 bitmap;
+ for (int32_t value = begin; value < end; ++value) {
+ bitmap.Add(value);
+ }
+ return std::make_shared<BitmapDeletionVector>(bitmap);
+}
+
+} // namespace
+
+TEST(BucketedDvMaintainerTest, TestDeletionVectorLookupAndIndexFileGetter) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir);
+ auto index_file = CreateDvIndexFile(dir->Str());
+ std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors = {
+ {"file-a", CreateDeletionVector(0, 3)}, {"file-b",
CreateDeletionVector(10, 12)}};
+
+ BucketedDvMaintainer maintainer(index_file, deletion_vectors);
+
+ ASSERT_EQ(maintainer.DvIndexFile(), index_file);
+ auto lookup_hit = maintainer.DeletionVectorOf("file-a");
+ ASSERT_TRUE(lookup_hit.has_value());
+ auto lookup_miss = maintainer.DeletionVectorOf("missing");
+ ASSERT_FALSE(lookup_miss.has_value());
+}
+
+TEST(BucketedDvMaintainerTest, TestWriteDeletionVectorsIndexOnlyWhenModified) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir);
+ auto index_file = CreateDvIndexFile(dir->Str());
+ std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors = {
+ {"file-a", CreateDeletionVector(0, 3)},
+ {"file-b", CreateDeletionVector(10, 14)},
+ };
+
+ BucketedDvMaintainer maintainer(index_file, deletion_vectors);
+
+ // No modification yet, so no index file should be written.
+ ASSERT_OK_AND_ASSIGN(auto not_modified_write,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_FALSE(not_modified_write.has_value());
+
+ // Removing a missing key should keep the maintainer unmodified.
+ maintainer.RemoveDeletionVectorOf("missing");
+ ASSERT_OK_AND_ASSIGN(auto still_not_modified_write,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_FALSE(still_not_modified_write.has_value());
+
+ // Removing an existing key marks the maintainer modified and triggers one
write.
+ maintainer.RemoveDeletionVectorOf("file-a");
+ ASSERT_OK_AND_ASSIGN(auto modified_write,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_TRUE(modified_write.has_value());
+ ASSERT_GT(modified_write.value()->FileSize(), 0);
+
+ // Modification flag should be reset after a successful write.
+ ASSERT_OK_AND_ASSIGN(auto write_after_reset,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_FALSE(write_after_reset.has_value());
+}
+
+TEST(BucketedDvMaintainerTest,
TestNotifyNewDeletionCreatesVectorAndMarksModified) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir);
+ auto index_file = CreateDvIndexFile(dir->Str());
+ BucketedDvMaintainer maintainer(index_file, /*deletion_vectors=*/{});
+
+ ASSERT_OK(maintainer.NotifyNewDeletion("file-new", /*position=*/7));
+
+ auto created = maintainer.DeletionVectorOf("file-new");
+ ASSERT_TRUE(created.has_value());
+ ASSERT_OK_AND_ASSIGN(bool deleted,
created.value()->IsDeleted(/*position=*/7));
+ ASSERT_TRUE(deleted);
+
+ ASSERT_OK_AND_ASSIGN(auto modified_write,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_TRUE(modified_write.has_value());
+
+ ASSERT_OK_AND_ASSIGN(auto write_after_reset,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_FALSE(write_after_reset.has_value());
+}
+
+TEST(BucketedDvMaintainerTest,
TestNotifyNewDeletionOnExistingVectorOnlyMarksWhenUpdated) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir);
+ auto index_file = CreateDvIndexFile(dir->Str());
+ std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors = {
+ {"file-a", CreateDeletionVector(0, 3)}}; // already contains 0,1,2
+ BucketedDvMaintainer maintainer(index_file, deletion_vectors);
+
+ // Deleting an existing position should not mark modified.
+ ASSERT_OK(maintainer.NotifyNewDeletion("file-a", /*position=*/2));
+ ASSERT_OK_AND_ASSIGN(auto not_modified_write,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_FALSE(not_modified_write.has_value());
+
+ // Deleting a new position should mark modified.
+ ASSERT_OK(maintainer.NotifyNewDeletion("file-a", /*position=*/9));
+ ASSERT_OK_AND_ASSIGN(auto modified_write,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_TRUE(modified_write.has_value());
+}
+
+TEST(BucketedDvMaintainerTest,
TestNotifyNewDeletionReturnsNotImplementedForBitmap64) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir);
+ auto index_file = CreateDvIndexFile(dir->Str(), /*bitmap64=*/true);
+ BucketedDvMaintainer maintainer(index_file, /*deletion_vectors=*/{});
+
+ Status status = maintainer.NotifyNewDeletion("file-new", /*position=*/1);
+ ASSERT_TRUE(status.IsNotImplemented());
+
+ auto lookup = maintainer.DeletionVectorOf("file-new");
+ ASSERT_FALSE(lookup.has_value());
+
+ ASSERT_OK_AND_ASSIGN(auto write_result,
maintainer.WriteDeletionVectorsIndex());
+ ASSERT_FALSE(write_result.has_value());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.cpp
b/src/paimon/core/deletionvectors/deletion_file_writer.cpp
new file mode 100644
index 0000000..9942708
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_file_writer.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_file_writer.h"
+
+#include "paimon/common/io/data_output_stream.h"
+#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
+
+namespace paimon {
+
+Result<std::unique_ptr<DeletionFileWriter>> DeletionFileWriter::Create(
+ const std::shared_ptr<IndexPathFactory>& path_factory, const
std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<MemoryPool>& pool) {
+ std::string path = path_factory->NewPath();
+ bool is_external_path = path_factory->IsExternalPath();
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<OutputStream> out, fs->Create(path,
/*overwrite=*/true));
+ DataOutputStream output_stream(out);
+
PAIMON_RETURN_NOT_OK(output_stream.WriteValue<int8_t>(DeletionVectorsIndexFile::VERSION_ID_V1));
+ return std::unique_ptr<DeletionFileWriter>(
+ new DeletionFileWriter(path, is_external_path, out, pool));
+}
+
+Status DeletionFileWriter::Write(const std::string& key,
+ const std::shared_ptr<DeletionVector>&
deletion_vector) {
+ PAIMON_ASSIGN_OR_RAISE(int64_t start, out_->GetPos());
+ if (start < 0 || start > std::numeric_limits<int32_t>::max()) {
+ return Status::Invalid(fmt::format("Output position {} out of int32
range.", start));
+ }
+ DataOutputStream output_stream(out_);
+ PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_,
&output_stream));
+ dv_metas_.insert(key, DeletionVectorMeta(key, static_cast<int32_t>(start),
length,
+
deletion_vector->GetCardinality()));
+ return Status::OK();
+}
+
+Result<std::unique_ptr<IndexFileMeta>> DeletionFileWriter::GetResult() const {
+ int64_t length = output_bytes_;
+ if (length < 0 || length > std::numeric_limits<int32_t>::max()) {
+ return Status::Invalid(
+ fmt::format("Deletion file result length {} out of int32 range.",
length));
+ }
+ std::optional<std::string> final_path;
+ if (is_external_path_) {
+ PAIMON_ASSIGN_OR_RAISE(Path external_path, PathUtil::ToPath(path_));
+ final_path = external_path.ToString();
+ }
+ return
std::make_unique<IndexFileMeta>(DeletionVectorsIndexFile::DELETION_VECTORS_INDEX,
+ PathUtil::GetName(path_), length,
dv_metas_.size(),
+ dv_metas_, final_path);
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h
b/src/paimon/core/deletionvectors/deletion_file_writer.h
new file mode 100644
index 0000000..9a04b23
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_file_writer.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/linked_hash_map.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/index/deletion_vector_meta.h"
+#include "paimon/core/index/index_path_factory.h"
+#include "paimon/fs/file_system.h"
+
+namespace paimon {
+
+/// Writer to write deletion file.
+class DeletionFileWriter {
+ public:
+ static Result<std::unique_ptr<DeletionFileWriter>> Create(
+ const std::shared_ptr<IndexPathFactory>& path_factory,
+ const std::shared_ptr<FileSystem>& fs, const
std::shared_ptr<MemoryPool>& pool);
+
+ Result<int64_t> GetPos() const {
+ return out_->GetPos();
+ }
+
+ Status Write(const std::string& key, const
std::shared_ptr<DeletionVector>& deletion_vector);
+
+ Status Close() {
+ PAIMON_RETURN_NOT_OK(out_->Flush());
+ PAIMON_ASSIGN_OR_RAISE(output_bytes_, out_->GetPos());
+ return out_->Close();
+ }
+
+ Result<std::unique_ptr<IndexFileMeta>> GetResult() const;
+
+ private:
+ DeletionFileWriter(const std::string& path, bool is_external_path,
+ std::shared_ptr<OutputStream>& out, const
std::shared_ptr<MemoryPool>& pool)
+ : path_(path), is_external_path_(is_external_path),
out_(std::move(out)), pool_(pool) {}
+
+ std::string path_;
+ bool is_external_path_;
+ std::shared_ptr<OutputStream> out_;
+ std::shared_ptr<MemoryPool> pool_;
+ LinkedHashMap<std::string, DeletionVectorMeta> dv_metas_;
+ int64_t output_bytes_ = -1;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp
b/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp
new file mode 100644
index 0000000..93777f0
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_file_writer.h"
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/mock/mock_index_path_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(DeletionFileWriterTest, WriteCloseAndReadBack) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+
+ ASSERT_OK_AND_ASSIGN(auto writer, DeletionFileWriter::Create(path_factory,
fs, pool));
+
+ RoaringBitmap32 roaring_1;
+ roaring_1.Add(1);
+ roaring_1.Add(3);
+ roaring_1.Add(5);
+ auto dv_1 = std::make_shared<BitmapDeletionVector>(roaring_1);
+
+ RoaringBitmap32 roaring_2;
+ roaring_2.Add(100);
+ auto dv_2 = std::make_shared<BitmapDeletionVector>(roaring_2);
+
+ ASSERT_OK(writer->Write("data-file-1", dv_1));
+ ASSERT_OK(writer->Write("data-file-2", dv_2));
+ ASSERT_OK(writer->Close());
+
+ ASSERT_OK_AND_ASSIGN(auto meta_unique, writer->GetResult());
+ ASSERT_EQ(meta_unique->IndexType(),
DeletionVectorsIndexFile::DELETION_VECTORS_INDEX);
+ ASSERT_EQ(meta_unique->FileName(), "index-0");
+ ASSERT_EQ(meta_unique->RowCount(), 2);
+ ASSERT_EQ(meta_unique->ExternalPath(), std::nullopt);
+ ASSERT_GT(meta_unique->FileSize(), 0);
+
+ const auto& dv_ranges = meta_unique->DvRanges();
+ ASSERT_TRUE(dv_ranges.has_value());
+ ASSERT_EQ(dv_ranges->size(), 2);
+
+ auto first = dv_ranges->find("data-file-1");
+ auto second = dv_ranges->find("data-file-2");
+ ASSERT_NE(first, dv_ranges->end());
+ ASSERT_NE(second, dv_ranges->end());
+ ASSERT_EQ(first->second.GetOffset(), 1);
+ ASSERT_GT(second->second.GetOffset(), first->second.GetOffset());
+
+ std::shared_ptr<IndexFileMeta> meta = std::move(meta_unique);
+ DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false,
pool);
+ ASSERT_OK_AND_ASSIGN(auto read_back,
index_file.ReadAllDeletionVectors(meta));
+
+ ASSERT_EQ(read_back.size(), 2);
+ ASSERT_EQ(read_back.at("data-file-1")->GetCardinality(), 3);
+ ASSERT_EQ(read_back.at("data-file-2")->GetCardinality(), 1);
+
+ ASSERT_OK_AND_ASSIGN(bool is_deleted,
read_back.at("data-file-1")->IsDeleted(3));
+ ASSERT_TRUE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted,
read_back.at("data-file-1")->IsDeleted(4));
+ ASSERT_FALSE(is_deleted);
+}
+
+TEST(DeletionFileWriterTest, GetResultWithoutCloseShouldFail) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+
+ ASSERT_OK_AND_ASSIGN(auto writer, DeletionFileWriter::Create(path_factory,
fs, pool));
+ ASSERT_NOK_WITH_MSG(writer->GetResult(), "result length -1 out of int32
range");
+}
+
+TEST(DeletionFileWriterTest, ExternalPathInResult) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ path_factory->SetExternal(true);
+ auto pool = GetDefaultPool();
+
+ ASSERT_OK_AND_ASSIGN(auto writer, DeletionFileWriter::Create(path_factory,
fs, pool));
+
+ RoaringBitmap32 roaring;
+ roaring.Add(0);
+ auto dv = std::make_shared<BitmapDeletionVector>(roaring);
+ ASSERT_OK(writer->Write("data-file-ext", dv));
+ ASSERT_OK(writer->Close());
+
+ ASSERT_OK_AND_ASSIGN(auto meta, writer->GetResult());
+ ASSERT_TRUE(meta->ExternalPath().has_value());
+ ASSERT_EQ(meta->ExternalPath().value(), PathUtil::JoinPath(dir->Str(),
"index-0"));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/deletionvectors/deletion_vector.cpp
b/src/paimon/core/deletionvectors/deletion_vector.cpp
new file mode 100644
index 0000000..82e347d
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vector.cpp
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_vector.h"
+
+#include <cstddef>
+#include <string>
+
+#include "fmt/format.h"
+#include "paimon/core/deletionvectors/bitmap64_deletion_vector.h"
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h"
+#include "paimon/core/table/source/deletion_file.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon {
+
+DeletionVector::Factory DeletionVector::CreateFactory(
+ const std::shared_ptr<FileSystem>& file_system,
+ const std::unordered_map<std::string, DeletionFile>& deletion_file_map,
+ const std::shared_ptr<MemoryPool>& pool) {
+ return [file_system, deletion_file_map,
+ pool](const std::string& file_name) ->
Result<std::shared_ptr<DeletionVector>> {
+ auto iter = deletion_file_map.find(file_name);
+ if (iter != deletion_file_map.end()) {
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<DeletionVector> dv,
+ DeletionVector::Read(file_system.get(), iter->second,
pool.get()));
+ return dv;
+ }
+ return std::shared_ptr<DeletionVector>();
+ };
+}
+
+DeletionVector::Factory DeletionVector::CreateFactory(
+ const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer) {
+ return
+ [dv_maintainer](const std::string& file_name) ->
Result<std::shared_ptr<DeletionVector>> {
+ if (dv_maintainer) {
+ return dv_maintainer->DeletionVectorOf(file_name).value_or(
+ std::shared_ptr<DeletionVector>());
+ }
+ return std::shared_ptr<DeletionVector>();
+ };
+}
+
+Result<PAIMON_UNIQUE_PTR<DeletionVector>>
DeletionVector::DeserializeFromBytes(const Bytes* bytes,
+
MemoryPool* pool) {
+ return BitmapDeletionVector::Deserialize(bytes->data(), bytes->size(),
pool);
+}
+
+Result<PAIMON_UNIQUE_PTR<DeletionVector>> DeletionVector::Read(const
FileSystem* file_system,
+ const
DeletionFile& deletion_file,
+ MemoryPool*
pool) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input,
+ file_system->Open(deletion_file.path));
+ DataInputStream file_input_stream(input);
+ PAIMON_RETURN_NOT_OK(file_input_stream.Seek(deletion_file.offset));
+ PAIMON_ASSIGN_OR_RAISE(int32_t actual_length,
file_input_stream.ReadValue<int32_t>());
+ if (actual_length != deletion_file.length) {
+ return Status::Invalid(
+ fmt::format("Size not match, actual size: {}, expect size: {},
file path: {}",
+ actual_length, deletion_file.length,
deletion_file.path));
+ }
+ auto bytes = Bytes::AllocateBytes(deletion_file.length, pool);
+ PAIMON_RETURN_NOT_OK(file_input_stream.ReadBytes(bytes.get()));
+ return DeserializeFromBytes(bytes.get(), pool);
+}
+
+Result<PAIMON_UNIQUE_PTR<DeletionVector>>
DeletionVector::Read(DataInputStream* input_stream,
+
std::optional<int64_t> length,
+ MemoryPool*
pool) {
+ PAIMON_ASSIGN_OR_RAISE(int32_t bitmap_length,
input_stream->ReadValue<int32_t>());
+ PAIMON_ASSIGN_OR_RAISE(int32_t magic_number,
input_stream->ReadValue<int32_t>());
+
+ if (magic_number == BitmapDeletionVector::MAGIC_NUMBER) {
+ if (length.has_value() && bitmap_length != length.value()) {
+ return Status::Invalid(fmt::format("Size not match, actual size:
{}, expected size: {}",
+ bitmap_length, length.value()));
+ }
+
+ int32_t payload_length = bitmap_length -
BitmapDeletionVector::MAGIC_NUMBER_SIZE_BYTES;
+ if (payload_length < 0) {
+ return Status::Invalid(fmt::format("Invalid bitmap length: {}",
bitmap_length));
+ }
+
+ auto bytes = Bytes::AllocateBytes(payload_length, pool);
+ PAIMON_RETURN_NOT_OK(input_stream->ReadBytes(bytes.get()));
+ // skip crc (4 bytes)
+ PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] int32_t unused_crc,
+ input_stream->ReadValue<int32_t>());
+
+ return
BitmapDeletionVector::DeserializeWithoutMagicNumber(bytes->data(),
bytes->size(),
+ pool);
+ } else if (EndianSwapValue(magic_number) ==
Bitmap64DeletionVector::MAGIC_NUMBER) {
+ return Status::NotImplemented(
+ "bitmap64 deletion vectors are not supported in this version, "
+ "please use bitmap deletion vectors instead or upgrade to a
version "
+ "that supports bitmap64.");
+ }
+
+ return Status::Invalid(fmt::format(
+ "Invalid magic number: {}, v1 dv magic number: {}, v2 magic number:
{}", magic_number,
+ BitmapDeletionVector::MAGIC_NUMBER,
Bitmap64DeletionVector::MAGIC_NUMBER));
+}
+
+PAIMON_UNIQUE_PTR<DeletionVector> DeletionVector::FromPrimitiveArray(
+ const std::vector<char>& is_deleted, MemoryPool* pool) {
+ RoaringBitmap32 roaring;
+ for (size_t i = 0; i < is_deleted.size(); i++) {
+ if (is_deleted[i] == static_cast<char>(1)) {
+ roaring.Add(i);
+ }
+ }
+ return pool->AllocateUnique<BitmapDeletionVector>(roaring);
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/deletion_vector.h
b/src/paimon/core/deletionvectors/deletion_vector.h
new file mode 100644
index 0000000..bc53af2
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vector.h
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "paimon/core/io/data_file_meta.h"
+#include "paimon/core/table/source/deletion_file.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+class FileSystem;
+class DataOutputStream;
+class BucketedDvMaintainer;
+struct DeletionFile;
+
+/// The DeletionVector can efficiently record the positions of rows that are
deleted in a file,
+/// which can then be used to filter out deleted rows when processing the file.
+class DeletionVector {
+ public:
+ using Factory =
std::function<Result<std::shared_ptr<DeletionVector>>(const std::string&)>;
+
+ static Factory CreateFactory(
+ const std::shared_ptr<FileSystem>& file_system,
+ const std::unordered_map<std::string, DeletionFile>& deletion_file_map,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ static Factory CreateFactory(const std::shared_ptr<BucketedDvMaintainer>&
dv_maintainer);
+
+ virtual ~DeletionVector() = default;
+
+ /// Marks the row at the specified position as deleted.
+ ///
+ /// @param position The position of the row to be marked as deleted.
+ virtual Status Delete(int64_t position) = 0;
+
+ /// Marks the row at the specified position as deleted.
+ ///
+ /// @param position The position of the row to be marked as deleted.
+ /// @return true if the added position wasn't already deleted. False
otherwise.
+ virtual Result<bool> CheckedDelete(int64_t position) = 0;
+
+ /// Checks if the row at the specified position is marked as deleted.
+ ///
+ /// @param position The position of the row to check.
+ /// @return true if the row is marked as deleted, false otherwise.
+ virtual Result<bool> IsDeleted(int64_t position) const = 0;
+
+ Result<RoaringBitmap32> IsValid(int64_t start_position, int64_t length)
const {
+ RoaringBitmap32 is_valid;
+ for (int64_t i = 0; i < length; i++) {
+ PAIMON_ASSIGN_OR_RAISE(bool is_deleted, IsDeleted(start_position +
i));
+ if (!is_deleted) {
+ is_valid.Add(i);
+ }
+ }
+ return is_valid;
+ }
+
+ /// Merges another DeletionVector into this current one.
+ ///
+ /// This method combines the deletion positions from the other deletion
vector
+ /// with the current one, marking all positions deleted in either vector
as deleted.
+ ///
+ /// @param deletion_vector The other DeletionVector to merge into this one.
+ /// @return Status indicating success or failure of the merge operation.
+ virtual Status Merge(const std::shared_ptr<DeletionVector>&
deletion_vector) = 0;
+
+ /// Determines if the deletion vector is empty, indicating no deletions.
+ ///
+ /// @return true if the deletion vector is empty, false if it contains
deletions.
+ virtual bool IsEmpty() const = 0;
+
+ /// @return the number of distinct integers added to the DeletionVector.
+ virtual int64_t GetCardinality() const = 0;
+
+ /// Serializes the deletion vector.
+ virtual Result<int32_t> SerializeTo(const std::shared_ptr<MemoryPool>&
pool,
+ DataOutputStream* out) = 0;
+
+ /// Serializes the deletion vector to a byte array for storage or
transmission.
+ ///
+ /// @return A byte array representing the serialized deletion vector.
+ virtual Result<PAIMON_UNIQUE_PTR<Bytes>> SerializeToBytes(
+ const std::shared_ptr<MemoryPool>& pool) = 0;
+
+ /// Deserializes a deletion vector from a byte array.
+ ///
+ /// @param bytes The byte array containing the serialized deletion vector.
+ /// @return A DeletionVector instance that represents the deserialized
data.
+ static Result<PAIMON_UNIQUE_PTR<DeletionVector>>
DeserializeFromBytes(const Bytes* bytes,
+
MemoryPool* pool);
+
+ static Result<PAIMON_UNIQUE_PTR<DeletionVector>> Read(const FileSystem*
file_system,
+ const DeletionFile&
deletion_file,
+ MemoryPool* pool);
+
+ static Result<PAIMON_UNIQUE_PTR<DeletionVector>> Read(DataInputStream*
input_stream,
+
std::optional<int64_t> length,
+ MemoryPool* pool);
+
+ static PAIMON_UNIQUE_PTR<DeletionVector> FromPrimitiveArray(const
std::vector<char>& is_deleted,
+ MemoryPool*
pool);
+};
+
+} // namespace paimon
diff --git
a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp
b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp
new file mode 100644
index 0000000..7fbc045
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h"
+
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/core/deletionvectors/deletion_file_writer.h"
+
+namespace paimon {
+
+Result<std::shared_ptr<IndexFileMeta>>
DeletionVectorIndexFileWriter::WriteSingleFile(
+ const std::map<std::string, std::shared_ptr<DeletionVector>>& input) {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<DeletionFileWriter> writer,
+ DeletionFileWriter::Create(index_path_factory_,
fs_, pool_));
+ ScopeGuard guard([&]() {
+ if (writer) {
+ (void)writer->Close();
+ }
+ });
+ for (const auto& [key, value] : input) {
+ PAIMON_RETURN_NOT_OK(writer->Write(key, value));
+ }
+ guard.Release();
+ PAIMON_RETURN_NOT_OK(writer->Close());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<IndexFileMeta> result,
writer->GetResult());
+ return result;
+}
+
+} // namespace paimon
diff --git
a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h
b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h
new file mode 100644
index 0000000..d8395b8
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+#include <memory>
+
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/index/index_path_factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Writer for deletion vector index file.
+class DeletionVectorIndexFileWriter {
+ public:
+ DeletionVectorIndexFileWriter(const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<IndexPathFactory>&
path_factory,
+ const std::shared_ptr<MemoryPool>& pool)
+ : index_path_factory_(path_factory), fs_(fs), pool_(pool) {}
+
+ /// The deletion file of the bucketed table is updated according to the
bucket. If a compaction
+ /// occurs and there is no longer a deletion file, an empty deletion file
needs to be generated
+ /// to overwrite the old file.
+ /// TODO(yonghao.fyh): We can consider sending a message to delete the
deletion file in the
+ /// future.
+ Result<std::shared_ptr<IndexFileMeta>> WriteSingleFile(
+ const std::map<std::string, std::shared_ptr<DeletionVector>>& input);
+
+ private:
+ std::shared_ptr<IndexPathFactory> index_path_factory_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+} // namespace paimon
diff --git
a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer_test.cpp
b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer_test.cpp
new file mode 100644
index 0000000..1d48027
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer_test.cpp
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h"
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/mock/mock_index_path_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+namespace {
+
+class FailingDeletionVector : public DeletionVector {
+ public:
+ Status Delete(int64_t) override {
+ return Status::Invalid("injected delete failure");
+ }
+
+ Result<bool> CheckedDelete(int64_t) override {
+ return Status::Invalid("injected checked-delete failure");
+ }
+
+ Result<bool> IsDeleted(int64_t) const override {
+ return Status::Invalid("injected is-deleted failure");
+ }
+
+ bool IsEmpty() const override {
+ return true;
+ }
+
+ int64_t GetCardinality() const override {
+ return 0;
+ }
+
+ Result<int32_t> SerializeTo(const std::shared_ptr<MemoryPool>&,
DataOutputStream*) override {
+ return Status::Invalid("injected serialize failure");
+ }
+
+ Result<PAIMON_UNIQUE_PTR<Bytes>> SerializeToBytes(const
std::shared_ptr<MemoryPool>&) override {
+ return Status::Invalid("injected serialize failure");
+ }
+
+ Status Merge(const std::shared_ptr<DeletionVector>&) override {
+ return Status::Invalid("injected merge failure");
+ }
+};
+
+} // namespace
+
+TEST(DeletionVectorIndexFileWriterTest, WriteSingleFileRoundTrip) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+
+ DeletionVectorIndexFileWriter writer(fs, path_factory, pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input;
+ RoaringBitmap32 roaring_1;
+ roaring_1.Add(1);
+ roaring_1.Add(2);
+ input["data-a"] = std::make_shared<BitmapDeletionVector>(roaring_1);
+
+ RoaringBitmap32 roaring_2;
+ roaring_2.Add(10);
+ input["data-b"] = std::make_shared<BitmapDeletionVector>(roaring_2);
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<IndexFileMeta> meta,
writer.WriteSingleFile(input));
+ ASSERT_EQ(meta->IndexType(),
DeletionVectorsIndexFile::DELETION_VECTORS_INDEX);
+ ASSERT_EQ(meta->FileName(), "index-0");
+ ASSERT_EQ(meta->RowCount(), 2);
+
+ DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false,
pool);
+ ASSERT_OK_AND_ASSIGN(auto read_back,
index_file.ReadAllDeletionVectors(meta));
+ ASSERT_EQ(read_back.size(), 2);
+
+ ASSERT_OK_AND_ASSIGN(bool is_deleted,
read_back.at("data-a")->IsDeleted(1));
+ ASSERT_TRUE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("data-a")->IsDeleted(3));
+ ASSERT_FALSE(is_deleted);
+
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("data-b")->IsDeleted(10));
+ ASSERT_TRUE(is_deleted);
+}
+
+TEST(DeletionVectorIndexFileWriterTest,
WriteSingleFileShouldReturnSerializeError) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+
+ DeletionVectorIndexFileWriter writer(fs, path_factory, pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input;
+ input["bad"] = std::make_shared<FailingDeletionVector>();
+
+ ASSERT_NOK_WITH_MSG(writer.WriteSingleFile(input), "injected serialize
failure");
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/deletionvectors/deletion_vector_test.cpp
b/src/paimon/core/deletionvectors/deletion_vector_test.cpp
new file mode 100644
index 0000000..e8742cd
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vector_test.cpp
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_vector.h"
+
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <set>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/core/deletionvectors/bitmap64_deletion_vector.h"
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+namespace {
+
+void AppendInt32BigEndian(std::vector<uint8_t>* bytes, int32_t value) {
+ bytes->push_back(static_cast<uint8_t>((value >> 24) & 0xFF));
+ bytes->push_back(static_cast<uint8_t>((value >> 16) & 0xFF));
+ bytes->push_back(static_cast<uint8_t>((value >> 8) & 0xFF));
+ bytes->push_back(static_cast<uint8_t>(value & 0xFF));
+}
+
+} // namespace
+
+TEST(DeletionVectorTest, TestSimple) {
+ std::set<int32_t> to_deleted;
+ for (int32_t i = 0; i < 10000; i++) {
+ to_deleted.insert(paimon::test::RandomNumber(0, 100000000l));
+ }
+ std::set<int32_t> not_deleted;
+ for (int32_t i = 0; i < 10000; i++) {
+ if (to_deleted.find(i) == to_deleted.end()) {
+ not_deleted.insert(i);
+ }
+ }
+ RoaringBitmap32 roaring;
+ auto deletion_vector = std::make_unique<BitmapDeletionVector>(roaring);
+ ASSERT_TRUE(deletion_vector->IsEmpty());
+ for (auto i : to_deleted) {
+ if (i % 2 == 0) {
+ ASSERT_OK(deletion_vector->Delete(i));
+ } else {
+ ASSERT_TRUE(deletion_vector->CheckedDelete(i).value());
+ ASSERT_FALSE(deletion_vector->CheckedDelete(i).value());
+ }
+ }
+ auto pool = GetDefaultPool();
+ ASSERT_OK_AND_ASSIGN(auto bytes, deletion_vector->SerializeToBytes(pool));
+ ASSERT_OK_AND_ASSIGN(auto de_deletion_vector,
+ DeletionVector::DeserializeFromBytes(bytes.get(),
pool.get()));
+
+ ASSERT_FALSE(deletion_vector->IsEmpty());
+ ASSERT_FALSE(de_deletion_vector->IsEmpty());
+
+ for (auto i : to_deleted) {
+ ASSERT_TRUE(deletion_vector->IsDeleted(i).value());
+ ASSERT_TRUE(de_deletion_vector->IsDeleted(i).value());
+ }
+ for (auto i : not_deleted) {
+ ASSERT_FALSE(deletion_vector->IsDeleted(i).value());
+ ASSERT_FALSE(de_deletion_vector->IsDeleted(i).value());
+ }
+}
+TEST(DeletionVectorTest, TestCompatibleWithJava) {
+ // generated from java, with magic_num and bitmap, deleted row is {1, 2, 4}
+ std::vector<uint8_t> data = {94, 67, 242, 208, 58, 48, 0, 0, 1, 0, 0, 0, 0,
+ 0, 2, 0, 16, 0, 0, 0, 1, 0, 2, 0, 4,
0};
+ auto pool = GetDefaultPool();
+ auto serialize_bytes = std::make_shared<Bytes>(data.size(), pool.get());
+ memcpy(serialize_bytes->data(), data.data(), data.size());
+
+ // test deserialize
+ ASSERT_OK_AND_ASSIGN(auto deletion_vector,
+
DeletionVector::DeserializeFromBytes(serialize_bytes.get(), pool.get()));
+ std::vector<bool> expected = {false, true, true, false, true, false};
+ std::vector<bool> result;
+ for (size_t i = 0; i < expected.size(); i++) {
+ result.emplace_back(deletion_vector->IsDeleted(i).value());
+ }
+ ASSERT_EQ(expected, result);
+
+ // test serialize
+ ASSERT_OK_AND_ASSIGN(auto serialized_dv,
deletion_vector->SerializeToBytes(pool));
+ ASSERT_EQ(*serialized_dv, *serialize_bytes);
+}
+
+TEST(DeletionVectorTest, ReadFromDataInputStreamLengthMismatch) {
+ std::vector<uint8_t> data;
+ AppendInt32BigEndian(&data, /*value=*/8);
+ AppendInt32BigEndian(&data, BitmapDeletionVector::MAGIC_NUMBER);
+
+ auto input_stream = std::make_shared<ByteArrayInputStream>(
+ reinterpret_cast<const char*>(data.data()), data.size());
+ DataInputStream in(input_stream);
+ auto pool = GetDefaultPool();
+ ASSERT_NOK_WITH_MSG(DeletionVector::Read(&in, /*length=*/9, pool.get()),
"Size not match");
+}
+
+TEST(DeletionVectorTest, ReadFromDataInputStreamInvalidBitmapLength) {
+ std::vector<uint8_t> data;
+ AppendInt32BigEndian(&data, /*value=*/3);
+ AppendInt32BigEndian(&data, BitmapDeletionVector::MAGIC_NUMBER);
+
+ auto input_stream = std::make_shared<ByteArrayInputStream>(
+ reinterpret_cast<const char*>(data.data()), data.size());
+ DataInputStream in(input_stream);
+ auto pool = GetDefaultPool();
+
+ ASSERT_NOK_WITH_MSG(DeletionVector::Read(&in, std::nullopt, pool.get()),
+ "Invalid bitmap length");
+}
+
+TEST(DeletionVectorTest, ReadFromDataInputStreamBitmap64NotImplemented) {
+ std::vector<uint8_t> data;
+ AppendInt32BigEndian(&data, /*value=*/8);
+ // Trigger: EndianSwapValue(magic_number) ==
Bitmap64DeletionVector::MAGIC_NUMBER.
+ AppendInt32BigEndian(&data,
EndianSwapValue(Bitmap64DeletionVector::MAGIC_NUMBER));
+
+ auto input_stream = std::make_shared<ByteArrayInputStream>(
+ reinterpret_cast<const char*>(data.data()), data.size());
+ DataInputStream in(input_stream);
+ auto pool = GetDefaultPool();
+
+ ASSERT_NOK_WITH_MSG(
+ DeletionVector::Read(&in, std::nullopt, pool.get()),
+ "NotImplemented: bitmap64 deletion vectors are not supported in this
version");
+}
+
+TEST(DeletionVectorTest, ReadFromDataInputStreamInvalidMagicNumber) {
+ std::vector<uint8_t> data;
+ AppendInt32BigEndian(&data, /*value=*/8);
+ AppendInt32BigEndian(&data, /*value=*/123456789);
+
+ auto input_stream = std::make_shared<ByteArrayInputStream>(
+ reinterpret_cast<const char*>(data.data()), data.size());
+ DataInputStream in(input_stream);
+ auto pool = GetDefaultPool();
+
+ ASSERT_NOK_WITH_MSG(DeletionVector::Read(&in, std::nullopt, pool.get()),
+ "Invalid magic number");
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp
b/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp
new file mode 100644
index 0000000..7124ee1
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h"
+
+namespace paimon {
+
+Result<std::shared_ptr<IndexFileMeta>>
DeletionVectorsIndexFile::WriteSingleFile(
+ const std::map<std::string, std::shared_ptr<DeletionVector>>& input) {
+ return CreateWriter()->WriteSingleFile(input);
+}
+
+std::shared_ptr<DeletionVectorIndexFileWriter>
DeletionVectorsIndexFile::CreateWriter() const {
+ return std::make_shared<DeletionVectorIndexFileWriter>(fs_, path_factory_,
pool_);
+}
+
+Result<std::map<std::string, std::shared_ptr<DeletionVector>>>
+DeletionVectorsIndexFile::ReadAllDeletionVectors(
+ const std::shared_ptr<IndexFileMeta>& file_meta) const {
+ std::optional<LinkedHashMap<std::string, DeletionVectorMeta>>
deletion_vector_metas =
+ file_meta->DvRanges();
+ if (deletion_vector_metas == std::nullopt) {
+ return Status::Invalid(
+ fmt::format("Read all deletion vectors failed from IndexFileMeta
'{}'. Deletion vector "
+ "metas is null",
+ file_meta->FileName()));
+ }
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
+ std::string file_path = path_factory_->ToPath(file_meta);
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream,
fs_->Open(file_path));
+ auto data_input_stream = std::make_shared<DataInputStream>(input_stream);
+ PAIMON_RETURN_NOT_OK(CheckVersion(data_input_stream));
+ for (const auto& [_, deletion_vector_meta] :
deletion_vector_metas.value()) {
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<DeletionVector> dv,
+ DeletionVector::Read(data_input_stream.get(),
+
static_cast<int64_t>(deletion_vector_meta.GetLength()),
+ pool_.get()));
+ deletion_vectors[deletion_vector_meta.GetDataFileName()] = dv;
+ }
+ return deletion_vectors;
+}
+
+Result<std::map<std::string, std::shared_ptr<DeletionVector>>>
+DeletionVectorsIndexFile::ReadAllDeletionVectors(
+ const std::vector<std::shared_ptr<IndexFileMeta>>& index_files) const {
+ std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
+ for (const auto& index_file : index_files) {
+ std::map<std::string, std::shared_ptr<DeletionVector>>
partial_deletion_vectors;
+ PAIMON_ASSIGN_OR_RAISE(partial_deletion_vectors,
ReadAllDeletionVectors(index_file));
+ for (const auto& [data_file_name, dv] : partial_deletion_vectors) {
+ deletion_vectors[data_file_name] = dv;
+ }
+ }
+ return deletion_vectors;
+}
+
+Status DeletionVectorsIndexFile::CheckVersion(const
std::shared_ptr<DataInputStream>& in) {
+ PAIMON_ASSIGN_OR_RAISE(int8_t version, in->ReadValue<int8_t>());
+ if (version != VERSION_ID_V1) {
+ return Status::Invalid(fmt::format(
+ "Version not match, actual version: {}, expected version: {}",
version, VERSION_ID_V1));
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file.h
b/src/paimon/core/deletionvectors/deletion_vectors_index_file.h
new file mode 100644
index 0000000..e488ee2
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/index/index_file.h"
+#include "paimon/core/index/index_file_meta.h"
+
+namespace paimon {
+
+class DeletionVectorIndexFileWriter;
+
+/// DeletionVectors index file.
+class DeletionVectorsIndexFile : public IndexFile {
+ public:
+ static constexpr char DELETION_VECTORS_INDEX[] = "DELETION_VECTORS";
+ static constexpr int8_t VERSION_ID_V1 = 1;
+
+ DeletionVectorsIndexFile(const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<IndexPathFactory>&
path_factory, bool bitmap64,
+ const std::shared_ptr<MemoryPool>& pool)
+ : IndexFile(fs, path_factory), bitmap64_(bitmap64), pool_(pool) {}
+
+ ~DeletionVectorsIndexFile() override = default;
+
+ bool Bitmap64() const {
+ return bitmap64_;
+ }
+
+ Result<std::shared_ptr<IndexFileMeta>> WriteSingleFile(
+ const std::map<std::string, std::shared_ptr<DeletionVector>>& input);
+
+ Result<std::map<std::string, std::shared_ptr<DeletionVector>>>
ReadAllDeletionVectors(
+ const std::shared_ptr<IndexFileMeta>& file_meta) const;
+
+ Result<std::map<std::string, std::shared_ptr<DeletionVector>>>
ReadAllDeletionVectors(
+ const std::vector<std::shared_ptr<IndexFileMeta>>& index_files) const;
+
+ private:
+ static Status CheckVersion(const std::shared_ptr<DataInputStream>& in);
+
+ std::shared_ptr<DeletionVectorIndexFileWriter> CreateWriter() const;
+
+ const bool bitmap64_;
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+} // namespace paimon
diff --git
a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp
b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp
new file mode 100644
index 0000000..c0aa02c
--- /dev/null
+++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/core/deletionvectors/deletion_vectors_index_file.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
+#include "paimon/core/index/index_file_meta.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/testing/mock/mock_index_path_factory.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(DeletionVectorsIndexFileTest, Basic) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+ auto index_file =
+ std::make_shared<DeletionVectorsIndexFile>(fs, path_factory,
/*bitmap64=*/false, pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input;
+ RoaringBitmap32 roaring_1;
+ for (int32_t i = 0; i < 10; ++i) {
+ roaring_1.Add(i);
+ }
+ input["dv1"] = std::make_shared<BitmapDeletionVector>(roaring_1);
+ RoaringBitmap32 roaring_2;
+ for (int32_t i = 100; i < 110; ++i) {
+ roaring_2.Add(i);
+ }
+ input["dv2"] = std::make_shared<BitmapDeletionVector>(roaring_2);
+
+ ASSERT_FALSE(index_file->Bitmap64());
+ ASSERT_OK_AND_ASSIGN(auto meta, index_file->WriteSingleFile(input));
+ ASSERT_GT(meta->FileSize(), 0);
+ ASSERT_OK_AND_ASSIGN(auto size, index_file->FileSize(meta));
+ ASSERT_EQ(meta->FileSize(), size);
+ ASSERT_EQ(meta->IndexType(),
DeletionVectorsIndexFile::DELETION_VECTORS_INDEX);
+ ASSERT_EQ(meta->FileName(), "index-0");
+ ASSERT_FALSE(index_file->IsExternalPath());
+ ASSERT_EQ(meta->ExternalPath(), std::nullopt);
+
+ // Round trip: write then read all deletion vectors from index file.
+ ASSERT_OK_AND_ASSIGN(auto read_back,
index_file->ReadAllDeletionVectors(meta));
+ ASSERT_EQ(read_back.size(), input.size());
+ ASSERT_EQ(read_back.at("dv1")->GetCardinality(), 10);
+ ASSERT_EQ(read_back.at("dv2")->GetCardinality(), 10);
+
+ ASSERT_OK_AND_ASSIGN(bool is_deleted, read_back.at("dv1")->IsDeleted(0));
+ ASSERT_TRUE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("dv1")->IsDeleted(10));
+ ASSERT_FALSE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("dv2")->IsDeleted(100));
+ ASSERT_TRUE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("dv2")->IsDeleted(99));
+ ASSERT_FALSE(is_deleted);
+}
+
+TEST(DeletionVectorsIndexFileTest, ExternalPathAndIndexFileMeta) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ path_factory->SetExternal(true);
+ auto pool = GetDefaultPool();
+ DeletionVectorsIndexFile index_file(fs, path_factory,
+ /*bitmap64=*/false, pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input;
+ RoaringBitmap32 roaring;
+ for (int32_t i = 0; i < 5; ++i) {
+ roaring.Add(i);
+ }
+ input["dv_ext"] = std::make_shared<BitmapDeletionVector>(roaring);
+
+ ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input));
+ ASSERT_EQ(meta->ExternalPath().value(), PathUtil::JoinPath(dir->Str(),
"index-0"));
+
+ // Round trip for external path index file.
+ ASSERT_OK_AND_ASSIGN(auto read_back,
index_file.ReadAllDeletionVectors(meta));
+ ASSERT_EQ(read_back.size(), 1);
+ ASSERT_EQ(read_back.at("dv_ext")->GetCardinality(), 5);
+ ASSERT_OK_AND_ASSIGN(bool is_deleted,
read_back.at("dv_ext")->IsDeleted(0));
+ ASSERT_TRUE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("dv_ext")->IsDeleted(5));
+ ASSERT_FALSE(is_deleted);
+}
+
+TEST(DeletionVectorsIndexFileTest, RoundTripEmptyInput) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+ DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false,
pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input;
+ ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input));
+ ASSERT_EQ(meta->RowCount(), 0);
+ ASSERT_OK_AND_ASSIGN(auto read_back,
index_file.ReadAllDeletionVectors(meta));
+ ASSERT_TRUE(read_back.empty());
+}
+
+TEST(DeletionVectorsIndexFileTest, RoundTripMultipleIndexFilesMerge) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+ DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false,
pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input1;
+ RoaringBitmap32 roaring_1;
+ roaring_1.Add(1);
+ roaring_1.Add(3);
+ input1["dv_a"] = std::make_shared<BitmapDeletionVector>(roaring_1);
+ ASSERT_OK_AND_ASSIGN(auto meta1, index_file.WriteSingleFile(input1));
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input2;
+ RoaringBitmap32 roaring_2;
+ roaring_2.Add(8);
+ input2["dv_b"] = std::make_shared<BitmapDeletionVector>(roaring_2);
+ ASSERT_OK_AND_ASSIGN(auto meta2, index_file.WriteSingleFile(input2));
+
+ ASSERT_OK_AND_ASSIGN(auto read_back,
+ index_file.ReadAllDeletionVectors(
+
std::vector<std::shared_ptr<IndexFileMeta>>{meta1, meta2}));
+ ASSERT_EQ(read_back.size(), 2);
+
+ ASSERT_OK_AND_ASSIGN(bool is_deleted, read_back.at("dv_a")->IsDeleted(1));
+ ASSERT_TRUE(is_deleted);
+ ASSERT_OK_AND_ASSIGN(is_deleted, read_back.at("dv_b")->IsDeleted(8));
+ ASSERT_TRUE(is_deleted);
+}
+
+TEST(DeletionVectorsIndexFileTest,
RoundTripMultipleIndexFilesLastWriteWinsOnSameKey) {
+ auto dir = UniqueTestDirectory::Create();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", dir->Str(), {}));
+ auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
+ auto pool = GetDefaultPool();
+ DeletionVectorsIndexFile index_file(fs, path_factory, /*bitmap64=*/false,
pool);
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input1;
+ RoaringBitmap32 roaring_old;
+ roaring_old.Add(2);
+ input1["same_dv"] = std::make_shared<BitmapDeletionVector>(roaring_old);
+ ASSERT_OK_AND_ASSIGN(auto meta1, index_file.WriteSingleFile(input1));
+
+ std::map<std::string, std::shared_ptr<DeletionVector>> input2;
+ RoaringBitmap32 roaring_new;
+ roaring_new.Add(9);
+ input2["same_dv"] = std::make_shared<BitmapDeletionVector>(roaring_new);
+ ASSERT_OK_AND_ASSIGN(auto meta2, index_file.WriteSingleFile(input2));
+
+ ASSERT_OK_AND_ASSIGN(auto read_back,
+ index_file.ReadAllDeletionVectors(
+
std::vector<std::shared_ptr<IndexFileMeta>>{meta1, meta2}));
+ ASSERT_EQ(read_back.size(), 1);
+
+ ASSERT_OK_AND_ASSIGN(bool is_deleted_old,
read_back.at("same_dv")->IsDeleted(2));
+ ASSERT_FALSE(is_deleted_old);
+ ASSERT_OK_AND_ASSIGN(bool is_deleted_new,
read_back.at("same_dv")->IsDeleted(9));
+ ASSERT_TRUE(is_deleted_new);
+}
+
+} // namespace paimon::test