This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7bb00d1632 [VL] Delta: Add native Delta DV reader support (#12040)
7bb00d1632 is described below
commit 7bb00d1632519681e19e6b3cae9700b974ac42ad
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Fri May 22 10:17:15 2026 +0300
[VL] Delta: Add native Delta DV reader support (#12040)
---
cpp/core/compute/Runtime.h | 5 +
cpp/velox/CMakeLists.txt | 7 +-
cpp/velox/compute/VeloxBackend.cc | 7 +
cpp/velox/compute/VeloxBackend.h | 4 +
cpp/velox/compute/VeloxConnectorIds.h | 2 +
cpp/velox/compute/VeloxRuntime.cc | 13 ++
cpp/velox/compute/WholeStageResultIterator.cc | 1 +
cpp/velox/compute/delta/DeltaConnector.cpp | 48 +++++
cpp/velox/compute/delta/DeltaConnector.h | 70 +++++++
cpp/velox/compute/delta/DeltaDataSource.cpp | 98 +++++++++
cpp/velox/compute/delta/DeltaDataSource.h | 78 ++++++++
.../compute/delta/DeltaDeletionVectorReader.cpp | 209 ++++++++++++++++++++
.../compute/delta/DeltaDeletionVectorReader.h | 110 +++++++++++
cpp/velox/compute/delta/DeltaSplit.cpp | 75 +++++++
cpp/velox/compute/delta/DeltaSplit.h | 122 ++++++++++++
cpp/velox/compute/delta/DeltaSplitReader.cpp | 208 +++++++++++++++++++
cpp/velox/compute/delta/DeltaSplitReader.h | 118 +++++++++++
cpp/velox/compute/delta/tests/CMakeLists.txt | 12 ++
.../compute/delta/tests/DeltaConnectorTest.cpp | 180 +++++++++++++++++
.../delta/tests/DeltaDeletionVectorReaderTest.cpp | 219 +++++++++++++++++++++
cpp/velox/compute/delta/tests/DeltaSplitTest.cpp | 95 +++++++++
21 files changed, 1680 insertions(+), 1 deletion(-)
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 9d8315731f..4ab944898b 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -56,6 +56,11 @@ struct SparkTaskInfo {
}
};
+struct SplitPayloadBufferView {
+ const uint8_t* data;
+ int32_t size;
+};
+
class Runtime : public std::enable_shared_from_this<Runtime> {
public:
using Factory = std::function<Runtime*(
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index a8b0d668ca..a97c6e60b3 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -160,6 +160,11 @@ set(VELOX_SRCS
compute/VeloxRuntime.cc
compute/VeloxPlanConverter.cc
compute/WholeStageResultIterator.cc
+ compute/delta/DeltaConnector.cpp
+ compute/delta/DeltaDataSource.cpp
+ compute/delta/DeltaDeletionVectorReader.cpp
+ compute/delta/DeltaSplit.cpp
+ compute/delta/DeltaSplitReader.cpp
compute/delta/RoaringBitmapArray.cpp
compute/iceberg/IcebergPlanConverter.cc
jni/JniFileSystem.cc
@@ -403,8 +408,8 @@ find_package(
target_link_libraries(velox PUBLIC ICU::i18n ICU::uc ICU::data)
if(BUILD_TESTS)
- add_subdirectory(tests)
add_subdirectory(compute/delta/tests)
+ add_subdirectory(tests)
endif()
if(BUILD_BENCHMARKS)
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index 85a8622508..801fc9d835 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -21,6 +21,7 @@
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
+#include "compute/delta/DeltaConnector.h"
#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"
#include "utils/ConfigExtractor.h"
@@ -323,6 +324,12 @@ std::shared_ptr<facebook::velox::connector::Connector>
VeloxBackend::createHiveC
return std::make_shared<velox::connector::hive::HiveConnector>(connectorId,
hiveConnectorConfig_, ioExecutor);
}
+std::shared_ptr<facebook::velox::connector::Connector>
VeloxBackend::createDeltaConnector(
+ const std::string& connectorId,
+ folly::Executor* ioExecutor) const {
+ return std::make_shared<delta::DeltaConnector>(connectorId,
hiveConnectorConfig_, ioExecutor);
+}
+
std::shared_ptr<facebook::velox::connector::Connector>
VeloxBackend::createValueStreamConnector(
const std::string& connectorId,
bool dynamicFilterEnabled) const {
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index 68791ec0f9..2796ce20c3 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -74,6 +74,10 @@ class VeloxBackend {
const std::string& connectorId,
folly::Executor* ioExecutor) const;
+ std::shared_ptr<facebook::velox::connector::Connector> createDeltaConnector(
+ const std::string& connectorId,
+ folly::Executor* ioExecutor) const;
+
std::shared_ptr<facebook::velox::connector::Connector>
createValueStreamConnector(
const std::string& connectorId,
bool dynamicFilterEnabled) const;
diff --git a/cpp/velox/compute/VeloxConnectorIds.h
b/cpp/velox/compute/VeloxConnectorIds.h
index e6082bae8b..a0e37ba8b6 100644
--- a/cpp/velox/compute/VeloxConnectorIds.h
+++ b/cpp/velox/compute/VeloxConnectorIds.h
@@ -23,9 +23,11 @@ namespace gluten {
struct VeloxConnectorIds {
std::string hive;
+ std::string delta;
std::string iterator;
std::string cudfHive;
bool hiveRegistered{false};
+ bool deltaRegistered{false};
bool iteratorRegistered{false};
bool cudfHiveRegistered{false};
};
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index e3eac17a22..62e6820e9c 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -31,6 +31,7 @@
#include "compute/ResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxPlanConverter.h"
+#include "compute/delta/DeltaConnector.h"
#include "config/VeloxConfig.h"
#include "operators/plannodes/IteratorSplit.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
@@ -213,6 +214,7 @@ std::string makeScopedConnectorId(const std::string& base,
uint64_t runtimeId) {
VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) {
return VeloxConnectorIds{
.hive = makeScopedConnectorId(kHiveConnectorId, runtimeId),
+ .delta =
makeScopedConnectorId(delta::DeltaConnectorFactory::kDeltaConnectorName,
runtimeId),
.iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId),
.cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)};
}
@@ -271,6 +273,13 @@ void VeloxRuntime::registerConnectors() {
velox::connector::hasConnector(connectorIds_.hive),
"Scoped hive connector not found after registration: " +
connectorIds_.hive);
+ connectorIds_.deltaRegistered =
+
velox::connector::registerConnector(backend->createDeltaConnector(connectorIds_.delta,
ioExecutor_.get()));
+ GLUTEN_CHECK(connectorIds_.deltaRegistered, "Failed to register scoped delta
connector: " + connectorIds_.delta);
+ GLUTEN_CHECK(
+ velox::connector::hasConnector(connectorIds_.delta),
+ "Scoped delta connector not found after registration: " +
connectorIds_.delta);
+
const auto valueStreamDynamicFilterEnabled =
veloxCfg_->get<bool>(kValueStreamDynamicFilterEnabled,
kValueStreamDynamicFilterEnabledDefault);
connectorIds_.iteratorRegistered = velox::connector::registerConnector(
@@ -306,6 +315,10 @@ void VeloxRuntime::unregisterConnectors() {
velox::connector::unregisterConnector(connectorIds_.iterator);
connectorIds_.iteratorRegistered = false;
}
+ if (connectorIds_.deltaRegistered) {
+ velox::connector::unregisterConnector(connectorIds_.delta);
+ connectorIds_.deltaRegistered = false;
+ }
if (connectorIds_.hiveRegistered) {
velox::connector::unregisterConnector(connectorIds_.hive);
connectorIds_.hiveRegistered = false;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 2c1effba20..ccc1917f41 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -216,6 +216,7 @@ std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQ
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_);
connectorConfigs[connectorIds_.hive] = hiveSessionConfig;
+ connectorConfigs[connectorIds_.delta] = hiveSessionConfig;
connectorConfigs[connectorIds_.iterator] = hiveSessionConfig;
#ifdef GLUTEN_ENABLE_GPU
if (!connectorIds_.cudfHive.empty()) {
diff --git a/cpp/velox/compute/delta/DeltaConnector.cpp
b/cpp/velox/compute/delta/DeltaConnector.cpp
new file mode 100644
index 0000000000..f25eb7714a
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaConnector.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaConnector.h"
+
+#include "compute/delta/DeltaDataSource.h"
+
+namespace gluten::delta {
+
+std::unique_ptr<DataSource> DeltaConnector::createDataSource(
+ const RowTypePtr& outputType,
+ const ConnectorTableHandlePtr& tableHandle,
+ const ColumnHandleMap& columnHandles,
+ ConnectorQueryCtx* connectorQueryCtx) {
+ return std::make_unique<DeltaDataSource>(
+ outputType, tableHandle, columnHandles, &fileHandleFactory_,
ioExecutor_, connectorQueryCtx, hiveConfig_);
+}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaConnector.h
b/cpp/velox/compute/delta/DeltaConnector.h
new file mode 100644
index 0000000000..2d3a1d8df6
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaConnector.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/connectors/hive/HiveConnector.h"
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+class DeltaConnector final : public HiveConnector {
+ public:
+ DeltaConnector(const std::string& id, std::shared_ptr<const
config::ConfigBase> config, folly::Executor* ioExecutor)
+ : HiveConnector(id, std::move(config), ioExecutor) {}
+
+ std::unique_ptr<DataSource> createDataSource(
+ const RowTypePtr& outputType,
+ const ConnectorTableHandlePtr& tableHandle,
+ const ColumnHandleMap& columnHandles,
+ ConnectorQueryCtx* connectorQueryCtx) override;
+};
+
+class DeltaConnectorFactory final : public ConnectorFactory {
+ public:
+ static constexpr const char* kDeltaConnectorName = "delta";
+
+ DeltaConnectorFactory() : ConnectorFactory(kDeltaConnectorName) {}
+
+ std::shared_ptr<Connector> newConnector(
+ const std::string& id,
+ std::shared_ptr<const config::ConfigBase> config,
+ folly::Executor* ioExecutor = nullptr,
+ [[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
+ return std::make_shared<DeltaConnector>(id, std::move(config), ioExecutor);
+ }
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDataSource.cpp
b/cpp/velox/compute/delta/DeltaDataSource.cpp
new file mode 100644
index 0000000000..f8c00c2b07
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDataSource.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaDataSource.h"
+
+#include "compute/delta/DeltaSplitReader.h"
+
+namespace gluten::delta {
+
+DeltaDataSource::DeltaDataSource(
+ const RowTypePtr& outputType,
+ const ConnectorTableHandlePtr& tableHandle,
+ const ColumnHandleMap& assignments,
+ FileHandleFactory* fileHandleFactory,
+ folly::Executor* ioExecutor,
+ const ConnectorQueryCtx* connectorQueryCtx,
+ const std::shared_ptr<HiveConfig>& hiveConfig)
+ : HiveDataSource(
+ outputType,
+ tableHandle,
+ assignments,
+ fileHandleFactory,
+ ioExecutor,
+ connectorQueryCtx,
+ hiveConfig) {}
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+std::unique_ptr<FileSplitReader> DeltaDataSource::createSplitReader() {
+ auto bucketChannels = prepareSplit();
+ auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(split_);
+
+ return std::make_unique<DeltaSplitReader>(
+ deltaSplit,
+ tableHandle_,
+ &partitionKeys_,
+ connectorQueryCtx_,
+ fileConfig_,
+ readerOutputType_,
+ dataIoStats_,
+ metadataIoStats_,
+ ioStats_,
+ fileHandleFactory_,
+ ioExecutor_,
+ scanSpec_,
+ &infoColumns_,
+ std::move(bucketChannels),
+ /*subfieldFiltersForValidation=*/getFilters());
+}
+#else
+std::unique_ptr<SplitReader> DeltaDataSource::createSplitReader() {
+ auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(split_);
+
+ return std::make_unique<DeltaSplitReader>(
+ deltaSplit,
+ hiveTableHandle_,
+ &partitionKeys_,
+ connectorQueryCtx_,
+ hiveConfig_,
+ readerOutputType_,
+ ioStatistics_,
+ ioStats_,
+ fileHandleFactory_,
+ ioExecutor_,
+ scanSpec_,
+ /*subfieldFiltersForValidation=*/getFilters());
+}
+#endif
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDataSource.h
b/cpp/velox/compute/delta/DeltaDataSource.h
new file mode 100644
index 0000000000..a9fe1a4753
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDataSource.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/connectors/hive/HiveDataSource.h"
+
+#ifndef GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#if __has_include("velox/connectors/hive/FileSplitReader.h")
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 1
+#else
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 0
+#endif
+#endif
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#include "velox/connectors/hive/FileSplitReader.h"
+#else
+namespace facebook::velox::connector::hive {
+class SplitReader;
+}
+#endif
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+class DeltaDataSource : public HiveDataSource {
+ public:
+ DeltaDataSource(
+ const RowTypePtr& outputType,
+ const ConnectorTableHandlePtr& tableHandle,
+ const ColumnHandleMap& assignments,
+ FileHandleFactory* fileHandleFactory,
+ folly::Executor* ioExecutor,
+ const ConnectorQueryCtx* connectorQueryCtx,
+ const std::shared_ptr<HiveConfig>& hiveConfig);
+
+ protected:
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ std::unique_ptr<FileSplitReader> createSplitReader() override;
+#else
+ std::unique_ptr<SplitReader> createSplitReader() override;
+#endif
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp
b/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp
new file mode 100644
index 0000000000..a48060d060
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaDeletionVectorReader.h"
+
+#include <cstring>
+#include "velox/common/base/BitUtil.h"
+#include "velox/common/base/Exceptions.h"
+
+namespace gluten::delta {
+
+namespace {
+
+constexpr uint64_t kDeltaBitmapArrayMagicBytes = 4;
+constexpr uint64_t kDeltaNativeBitmapArrayLengthBytes = 4;
+constexpr uint64_t kDeltaStoredPayloadLengthBytes = 4;
+constexpr uint32_t kDeltaPortableBitmapArrayMagicNumber = 1681511377;
+constexpr uint32_t kDeltaNativeBitmapArrayMagicNumber = 1681511376;
+
+uint32_t readUint32LittleEndian(const char* data) {
+ const auto* bytes = reinterpret_cast<const uint8_t*>(data);
+ return static_cast<uint32_t>(bytes[0]) | (static_cast<uint32_t>(bytes[1]) <<
8) |
+ (static_cast<uint32_t>(bytes[2]) << 16) |
(static_cast<uint32_t>(bytes[3]) << 24);
+}
+
+uint64_t readUint64LittleEndian(const char* data) {
+ const auto* bytes = reinterpret_cast<const uint8_t*>(data);
+ return static_cast<uint64_t>(bytes[0]) | (static_cast<uint64_t>(bytes[1]) <<
8) |
+ (static_cast<uint64_t>(bytes[2]) << 16) |
(static_cast<uint64_t>(bytes[3]) << 24) |
+ (static_cast<uint64_t>(bytes[4]) << 32) |
(static_cast<uint64_t>(bytes[5]) << 40) |
+ (static_cast<uint64_t>(bytes[6]) << 48) |
(static_cast<uint64_t>(bytes[7]) << 56);
+}
+
+roaring::Roaring64Map deserializeDeltaBitmapArray(std::string_view
serializedPayload, const std::string& dvPath) {
+ VELOX_USER_CHECK_GE(
+ serializedPayload.size(),
+ kDeltaBitmapArrayMagicBytes,
+ "Deletion vector payload is too small for Delta bitmap array: {}",
+ dvPath);
+
+ const auto magic = readUint32LittleEndian(serializedPayload.data());
+ if (magic == kDeltaPortableBitmapArrayMagicNumber) {
+ const auto portablePayload =
serializedPayload.substr(kDeltaBitmapArrayMagicBytes);
+ return roaring::Roaring64Map::readSafe(portablePayload.data(),
portablePayload.size());
+ }
+
+ if (magic == kDeltaNativeBitmapArrayMagicNumber) {
+ VELOX_USER_CHECK_GE(
+ serializedPayload.size(),
+ kDeltaBitmapArrayMagicBytes + kDeltaNativeBitmapArrayLengthBytes,
+ "Deletion vector payload is too small for Delta native bitmap array:
{}",
+ dvPath);
+
+ const auto bitmapCount = readUint32LittleEndian(serializedPayload.data() +
kDeltaBitmapArrayMagicBytes);
+ size_t offset = kDeltaBitmapArrayMagicBytes +
kDeltaNativeBitmapArrayLengthBytes;
+ roaring::Roaring64Map result;
+
+ for (uint64_t bitmapIndex = 0; bitmapIndex < bitmapCount; ++bitmapIndex) {
+ VELOX_USER_CHECK_LE(
+ offset + kDeltaStoredPayloadLengthBytes,
+ serializedPayload.size(),
+ "Deletion vector payload ended before bitmap {} size for {}",
+ bitmapIndex,
+ dvPath);
+
+ const auto bitmapSize = readUint32LittleEndian(serializedPayload.data()
+ offset);
+ offset += kDeltaStoredPayloadLengthBytes;
+
+ VELOX_USER_CHECK_LE(
+ offset + bitmapSize,
+ serializedPayload.size(),
+ "Deletion vector bitmap {} range exceeds payload size for {}",
+ bitmapIndex,
+ dvPath);
+
+ auto bitmap = roaring::Roaring::readSafe(serializedPayload.data() +
offset, bitmapSize);
+ VELOX_USER_CHECK_EQ(
+ bitmap.getSizeInBytes(true),
+ bitmapSize,
+ "Deletion vector bitmap {} size mismatch for {}: expected {}, got
{}",
+ bitmapIndex,
+ dvPath,
+ bitmapSize,
+ bitmap.getSizeInBytes(true));
+
+ const uint64_t rowBase = bitmapIndex << 32;
+ for (auto it = bitmap.begin(); it != bitmap.end(); ++it) {
+ result.add(rowBase | static_cast<uint64_t>(*it));
+ }
+ offset += bitmapSize;
+ }
+
+ VELOX_USER_CHECK_EQ(
+ offset,
+ serializedPayload.size(),
+ "Deletion vector payload has {} unexpected trailing bytes for {}",
+ serializedPayload.size() - offset,
+ dvPath);
+ return result;
+ }
+
+ VELOX_USER_FAIL("Unexpected Delta bitmap array magic number {} for {}",
magic, dvPath);
+}
+
+} // namespace
+
+void DeltaDeletionVectorReader::loadSerializedDeletionVectorInternal(
+ std::string_view serializedPayload,
+ const std::string& debugName,
+ std::optional<uint64_t> expectedCardinality) {
+ VELOX_USER_CHECK_GT(serializedPayload.size(), 0, "Serialized deletion vector
is empty: {}", debugName);
+
+ deletionBitmap_ = deserializeDeltaBitmapArray(serializedPayload, debugName);
+
+ if (expectedCardinality.has_value()) {
+ const auto actualCardinality = deletionBitmap_->cardinality();
+ VELOX_USER_CHECK_EQ(
+ actualCardinality,
+ expectedCardinality.value(),
+ "Deletion vector cardinality mismatch for {}: expected {}, got {}",
+ debugName,
+ expectedCardinality.value(),
+ actualCardinality);
+ }
+}
+
+void DeltaDeletionVectorReader::loadSerializedDeletionVector(
+ std::string_view serializedPayload,
+ std::optional<uint64_t> expectedCardinality) {
+ try {
+ loadSerializedDeletionVectorInternal(serializedPayload, "serialized
deletion vector", expectedCardinality);
+ } catch (const std::exception& e) {
+ VELOX_USER_FAIL("Failed to load serialized deletion vector: {}", e.what());
+ }
+}
+
+bool DeltaDeletionVectorReader::isRowDeleted(uint64_t rowPosition) {
+ if (!deletionBitmap_.has_value()) {
+ return false;
+ }
+
+ return deletionBitmap_->contains(rowPosition);
+}
+
+void DeltaDeletionVectorReader::applyDeletionFilter(uint64_t baseReadOffset,
uint64_t size, BufferPtr deleteBitmap) {
+ VELOX_CHECK_NOT_NULL(deleteBitmap, "Delete bitmap buffer is required");
+
+ if (!deletionBitmap_.has_value()) {
+ std::memset(deleteBitmap->asMutable<uint8_t>(), 0, bits::nbytes(size));
+ deleteBitmap->setSize(0);
+ return;
+ }
+
+ auto* rawBitmap = deleteBitmap->asMutable<uint64_t>();
+ std::memset(rawBitmap, 0, bits::nbytes(size));
+
+ bool hasDeletedRows = false;
+ uint64_t highestDeletedIndex = 0;
+ for (uint64_t i = 0; i < size; ++i) {
+ const uint64_t absoluteRowPos = baseReadOffset + i;
+ if (deletionBitmap_->contains(absoluteRowPos)) {
+ bits::setBit(rawBitmap, i);
+ hasDeletedRows = true;
+ highestDeletedIndex = i;
+ }
+ }
+
+ deleteBitmap->setSize(hasDeletedRows ? bits::nbytes(highestDeletedIndex + 1)
: 0);
+}
+
+uint64_t DeltaDeletionVectorReader::estimatedDeletedRowCount() const {
+ if (!deletionBitmap_.has_value()) {
+ return 0;
+ }
+
+ // Return actual cardinality instead of estimated size
+ return deletionBitmap_->cardinality();
+}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDeletionVectorReader.h
b/cpp/velox/compute/delta/DeltaDeletionVectorReader.h
new file mode 100644
index 0000000000..7eac6ea660
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDeletionVectorReader.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/common/base/BitUtil.h"
+#include "velox/vector/ComplexVector.h"
+
+#include <memory>
+#include <optional>
+#include <roaring/roaring64map.hh>
+#include <string>
+#include <string_view>
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+
+/// Reads and manages Delta Lake deletion vectors for filtering deleted rows
+/// during table scans.
+///
+/// The JVM Delta side materializes the deletion vector and hands the
serialized
+/// bitmap payload to native. This reader only deserializes that payload and
+/// applies row filtering during scan.
+///
+/// Usage example:
+/// @code
+/// auto reader = std::make_unique<DeltaDeletionVectorReader>();
+/// reader->loadSerializedDeletionVector(serializedPayload,
expectedCardinality);
+/// if (reader->isRowDeleted(42)) {
+/// // Skip this row during scan
+/// }
+/// @endcode
+class DeltaDeletionVectorReader {
+ public:
+ DeltaDeletionVectorReader() = default;
+
+ /// Loads a deletion vector from an already decoded serialized Delta payload.
+ /// @param serializedPayload Materialized Delta DV bitmap payload.
+ /// @param expectedCardinality Optional number of deleted row positions from
+ /// Delta metadata. When set, the reader verifies this value against the
+ /// deserialized bitmap cardinality and fails loading on mismatch.
+ void loadSerializedDeletionVector(
+ std::string_view serializedPayload,
+ std::optional<uint64_t> expectedCardinality = std::nullopt);
+
+ /// Checks if a specific row position is marked as deleted.
+ /// Note: This method is not const because it may update internal caching
+ /// state.
+ /// @param rowPosition 0-based row position in the data file
+ /// @return true if the row is deleted, false otherwise
+ bool isRowDeleted(uint64_t rowPosition);
+
+ /// Applies deletion filter to a batch of rows, updating the deletion bitmap.
+ /// This is called during scan to mark deleted rows in the output bitmap.
+ /// @param baseReadOffset Starting row position for this batch (absolute)
+ /// @param size Number of rows in the batch
+ /// @param deleteBitmap Output bitmap marking deleted rows (1 = deleted, 0 =
+ /// keep)
+ void applyDeletionFilter(uint64_t baseReadOffset, uint64_t size, BufferPtr
deleteBitmap);
+
+ /// Returns true if no deletion vector is loaded.
+ bool empty() const {
+ return !deletionBitmap_.has_value();
+ }
+
+ /// Returns the approximate number of deleted rows in the loaded DV.
+ /// Note: This is an approximation based on bitmap size.
+ uint64_t estimatedDeletedRowCount() const;
+
+ private:
+ void loadSerializedDeletionVectorInternal(
+ std::string_view serializedPayload,
+ const std::string& debugName,
+ std::optional<uint64_t> expectedCardinality);
+
+ // The loaded deletion vector bitmap
+ std::optional<roaring::Roaring64Map> deletionBitmap_;
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplit.cpp
b/cpp/velox/compute/delta/DeltaSplit.cpp
new file mode 100644
index 0000000000..830a326be1
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplit.cpp
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaSplit.h"
+
+namespace gluten::delta {
+
+HiveDeltaSplit::HiveDeltaSplit(
+ const std::string& connectorId,
+ const std::string& filePath,
+ dwio::common::FileFormat fileFormat,
+ uint64_t start,
+ uint64_t length,
+ const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys,
+ std::optional<int32_t> tableBucketNumber,
+ const std::unordered_map<std::string, std::string>& customSplitInfo,
+ const std::shared_ptr<std::string>& extraFileInfo,
+ const std::unordered_map<std::string, std::string>& serdeParameters,
+ bool cacheable,
+ std::optional<DeltaDeletionVectorDescriptor> deletionVector,
+ std::optional<DeltaFileStatistics> statistics,
+ DeltaRowIndexFilterType filterType,
+ const std::unordered_map<std::string, std::string>& infoColumns,
+ std::optional<FileProperties> fileProperties)
+ : HiveConnectorSplit(
+ connectorId,
+ filePath,
+ fileFormat,
+ start,
+ length,
+ partitionKeys,
+ tableBucketNumber,
+ customSplitInfo,
+ extraFileInfo,
+ serdeParameters,
+ /*splitWeight=*/0,
+ cacheable,
+ infoColumns,
+ fileProperties,
+ std::nullopt,
+ std::nullopt),
+ deletionVector(std::move(deletionVector)),
+ statistics(std::move(statistics)),
+ filterType(filterType) {}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplit.h
b/cpp/velox/compute/delta/DeltaSplit.h
new file mode 100644
index 0000000000..df05e62fc3
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplit.h
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <limits>
+#include <optional>
+#include <vector>
+
+#include "compute/Runtime.h"
+#include "velox/connectors/hive/HiveConnectorSplit.h"
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+enum class DeltaRowIndexFilterType {
+ kKeepAll,
+ kIfContained,
+ kIfNotContained,
+};
+
+struct DeltaDeletionVectorDescriptor {
+ std::optional<uint64_t> cardinality;
+ std::optional<SplitPayloadBufferView> serializedPayloadView;
+
+ static DeltaDeletionVectorDescriptor serialized(
+ std::optional<uint64_t> cardinality = std::nullopt,
+ std::optional<SplitPayloadBufferView> serializedPayloadView =
std::nullopt) {
+ return {cardinality, serializedPayloadView};
+ }
+
+ bool hasMaterializedPayload() const {
+ return serializedPayloadView.has_value();
+ }
+};
+
+/// File-level statistics for a Delta data file.
+/// Used to validate consistency with deletion vectors and
+/// calculate logical row counts.
+struct DeltaFileStatistics {
+ /// Physical number of rows in the Parquet file.
+ /// Required when deletion vector is present (per Delta spec).
+ std::optional<int64_t> numRecords;
+
+ /// Whether column statistics (min/max) are tight bounds.
+ /// - true: min/max values exist in the valid (non-deleted) rows
+ /// - false: min/max are bounds only, may not exist in valid rows
+ /// When false with a DV, statistics may be stale and unsuitable
+ /// for aggregations like max(column).
+ std::optional<bool> tightBounds;
+
+ /// Calculate the logical row count accounting for deletion vectors.
+ /// Returns the number of valid (non-deleted) rows.
+ /// Returns -1 if numRecords is not available.
+ int64_t logicalRowCount(const std::optional<DeltaDeletionVectorDescriptor>&
dv) const {
+ if (!numRecords.has_value()) {
+ return -1; // Unknown
+ }
+ if (!dv.has_value() || !dv->cardinality.has_value()) {
+ return *numRecords; // No deletions
+ }
+ return *numRecords - static_cast<int64_t>(*dv->cardinality);
+ }
+};
+
+struct HiveDeltaSplit : public connector::hive::HiveConnectorSplit {
+ std::optional<DeltaDeletionVectorDescriptor> deletionVector;
+ std::optional<DeltaFileStatistics> statistics;
+ DeltaRowIndexFilterType filterType;
+
+ HiveDeltaSplit(
+ const std::string& connectorId,
+ const std::string& filePath,
+ dwio::common::FileFormat fileFormat,
+ uint64_t start = 0,
+ uint64_t length = std::numeric_limits<uint64_t>::max(),
+ const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys = {},
+ std::optional<int32_t> tableBucketNumber = std::nullopt,
+ const std::unordered_map<std::string, std::string>& customSplitInfo = {},
+ const std::shared_ptr<std::string>& extraFileInfo = {},
+ const std::unordered_map<std::string, std::string>& serdeParameters = {},
+ bool cacheable = true,
+ std::optional<DeltaDeletionVectorDescriptor> deletionVector =
std::nullopt,
+ std::optional<DeltaFileStatistics> statistics = std::nullopt,
+ DeltaRowIndexFilterType filterType = DeltaRowIndexFilterType::kKeepAll,
+ const std::unordered_map<std::string, std::string>& infoColumns = {},
+ std::optional<FileProperties> fileProperties = std::nullopt);
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplitReader.cpp
b/cpp/velox/compute/delta/DeltaSplitReader.cpp
new file mode 100644
index 0000000000..0b71cf64cf
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplitReader.cpp
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaSplitReader.h"
+
+#include <string_view>
+
+#include "compute/delta/DeltaSplit.h"
+#include "velox/connectors/hive/HiveConfig.h"
+#include "velox/dwio/common/BufferUtil.h"
+
+using namespace facebook::velox::dwio::common;
+
+namespace gluten::delta {
+
+DeltaSplitReader::DeltaSplitReader(
+ const std::shared_ptr<const HiveDeltaSplit>& hiveSplit,
+ const DeltaTableHandlePtr& tableHandle,
+ const DeltaColumnHandleMap* partitionKeys,
+ const ConnectorQueryCtx* connectorQueryCtx,
+ const std::shared_ptr<const DeltaConfig>& fileConfig,
+ const RowTypePtr& readerOutputType,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ const std::shared_ptr<io::IoStatistics>& dataIoStats,
+ const std::shared_ptr<io::IoStatistics>& metadataIoStats,
+#else
+ const std::shared_ptr<io::IoStatistics>& ioStatistics,
+#endif
+ const std::shared_ptr<IoStats>& ioStats,
+ FileHandleFactory* fileHandleFactory,
+ folly::Executor* executor,
+ const std::shared_ptr<common::ScanSpec>& scanSpec,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ const std::unordered_map<std::string, FileColumnHandlePtr>* infoColumns,
+ std::vector<column_index_t> bucketChannels,
+#endif
+ const common::SubfieldFilters* subfieldFiltersForValidation)
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ : HiveSplitReader(
+ hiveSplit,
+ tableHandle,
+ partitionKeys,
+ connectorQueryCtx,
+ fileConfig,
+ readerOutputType,
+ dataIoStats,
+ metadataIoStats,
+ ioStats,
+ fileHandleFactory,
+ executor,
+ scanSpec,
+ infoColumns,
+ std::move(bucketChannels),
+ subfieldFiltersForValidation),
+#else
+ : SplitReader(
+ hiveSplit,
+ tableHandle,
+ partitionKeys,
+ connectorQueryCtx,
+ fileConfig,
+ readerOutputType,
+ ioStatistics,
+ ioStats,
+ fileHandleFactory,
+ executor,
+ scanSpec,
+ subfieldFiltersForValidation),
+#endif
+ baseReadRowNumber_(0),
+ deleteBitmap_(nullptr) {
+}
+
+void DeltaSplitReader::prepareSplit(
+ std::shared_ptr<common::MetadataFilter> metadataFilter,
+ dwio::common::RuntimeStatistics& runtimeStats,
+ const folly::F14FastMap<std::string, std::string>& fileReadOps) {
+ (void)fileReadOps;
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ HiveSplitReader::prepareSplit(std::move(metadataFilter), runtimeStats,
fileReadOps);
+ if (emptySplit() || !baseRowReader_) {
+ return;
+ }
+#else
+ SplitReader::prepareSplit(std::move(metadataFilter), runtimeStats,
fileReadOps);
+ if (emptySplit_ || !baseRowReader_) {
+ return;
+ }
+#endif
+
+ baseReadRowNumber_ = 0;
+ deleteBitmap_.reset();
+ deletionVectorReader_.reset();
+
+ auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(hiveSplit_);
+ if (!deltaSplit->deletionVector.has_value()) {
+ return;
+ }
+
+ const auto& descriptor = *deltaSplit->deletionVector;
+
+ // Validate statistics if provided
+ if (deltaSplit->statistics.has_value()) {
+ validateStatisticsForDeletionVectors(*deltaSplit->statistics, descriptor);
+ }
+
+ VELOX_USER_CHECK(
+ descriptor.hasMaterializedPayload(),
+ "Delta deletion vector payload was not materialized on the JVM side for
split {}",
+ hiveSplit_->filePath);
+
+ deletionVectorReader_ = std::make_unique<DeltaDeletionVectorReader>();
+ const auto& payloadView = descriptor.serializedPayloadView.value();
+ deletionVectorReader_->loadSerializedDeletionVector(
+ std::string_view(reinterpret_cast<const char*>(payloadView.data),
payloadView.size), descriptor.cardinality);
+}
+
+uint64_t DeltaSplitReader::next(uint64_t size, VectorPtr& output) {
+ Mutation mutation;
+ mutation.randomSkip = baseReaderOpts_.randomSkip().get();
+ mutation.deletedRows = nullptr;
+
+ const auto actualSize = baseRowReader_->nextReadSize(size);
+ baseReadRowNumber_ = baseRowReader_->nextRowNumber();
+ if (actualSize == RowReader::kAtEnd) {
+ return 0;
+ }
+
+ const auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(hiveSplit_);
+ if (deletionVectorReader_ && !deletionVectorReader_->empty()) {
+ const auto numBytes = bits::nbytes(actualSize);
+ ensureCapacity<int8_t>(deleteBitmap_, numBytes,
connectorQueryCtx_->memoryPool(), false, true);
+ deleteBitmap_->setSize(numBytes);
+ deletionVectorReader_->applyDeletionFilter(baseReadRowNumber_, actualSize,
deleteBitmap_);
+ if (deltaSplit->filterType == DeltaRowIndexFilterType::kIfNotContained) {
+ bits::negate(deleteBitmap_->asMutable<uint64_t>(), actualSize);
+ deleteBitmap_->setSize(numBytes);
+ }
+ } else if (deleteBitmap_) {
+ deleteBitmap_->setSize(0);
+ }
+
+ mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 ?
deleteBitmap_->as<uint64_t>() : nullptr;
+
+ auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation);
+ if (rowsScanned > 0 && output->size() > 0 && !bucketChannels().empty()) {
+ applyBucketConversion(output,
bucketConversionRows(*output->asChecked<RowVector>()));
+ }
+ return rowsScanned;
+}
+
+void DeltaSplitReader::validateStatisticsForDeletionVectors(
+ const DeltaFileStatistics& stats,
+ const DeltaDeletionVectorDescriptor& dv) {
+ // Per Delta spec: numRecords is required when DV is present
+ if (!stats.numRecords.has_value()) {
+ VELOX_USER_FAIL(
+ "File statistics must include numRecords when deletion vector "
+ "is present. This is required by the Delta Lake protocol.");
+ }
+
+ // Validate cardinality doesn't exceed numRecords
+ if (dv.cardinality.has_value() && static_cast<int64_t>(*dv.cardinality) >
*stats.numRecords) {
+ VELOX_USER_FAIL(
+ "Deletion vector cardinality ({}) exceeds file numRecords ({}). "
+ "This indicates data corruption or an invalid deletion vector.",
+ *dv.cardinality,
+ *stats.numRecords);
+ }
+
+ // Log warning if tightBounds is false (statistics may be stale)
+ if (stats.tightBounds.has_value() && !*stats.tightBounds) {
+ LOG(WARNING) << "File has deletion vector with loose bounds
(tightBounds=false). "
+ << "Column statistics (min/max) may not be accurate for
aggregations. "
+ << "Consider running OPTIMIZE to compact the deletion
vector.";
+ }
+}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplitReader.h
b/cpp/velox/compute/delta/DeltaSplitReader.h
new file mode 100644
index 0000000000..a9d4d2940c
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplitReader.h
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "compute/delta/DeltaDeletionVectorReader.h"
+#include "compute/delta/DeltaSplit.h"
+
+#ifndef GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#if __has_include("velox/connectors/hive/FileSplitReader.h")
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 1
+#else
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 0
+#endif
+#endif
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#include "velox/connectors/hive/HiveSplitReader.h"
+#elif __has_include("velox/connectors/hive/SplitReader.h")
+#include "velox/connectors/hive/SplitReader.h"
+#else
+#include "velox/connectors/hive/HiveDataSource.h"
+#endif
+#include "velox/connectors/hive/TableHandle.h"
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+using DeltaSplitReaderBase = HiveSplitReader;
+using DeltaConfig = FileConfig;
+using DeltaTableHandlePtr = FileTableHandlePtr;
+using DeltaColumnHandleMap = std::unordered_map<std::string,
FileColumnHandlePtr>;
+#else
+using DeltaSplitReaderBase = SplitReader;
+using DeltaConfig = HiveConfig;
+using DeltaTableHandlePtr = HiveTableHandlePtr;
+using DeltaColumnHandleMap = HiveColumnHandleMap;
+#endif
+
+class DeltaSplitReader : public DeltaSplitReaderBase {
+ public:
+ DeltaSplitReader(
+ const std::shared_ptr<const HiveDeltaSplit>& hiveSplit,
+ const DeltaTableHandlePtr& tableHandle,
+ const DeltaColumnHandleMap* partitionKeys,
+ const ConnectorQueryCtx* connectorQueryCtx,
+ const std::shared_ptr<const DeltaConfig>& fileConfig,
+ const RowTypePtr& readerOutputType,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ const std::shared_ptr<io::IoStatistics>& dataIoStats,
+ const std::shared_ptr<io::IoStatistics>& metadataIoStats,
+#else
+ const std::shared_ptr<io::IoStatistics>& ioStatistics,
+#endif
+ const std::shared_ptr<IoStats>& ioStats,
+ FileHandleFactory* fileHandleFactory,
+ folly::Executor* executor,
+ const std::shared_ptr<common::ScanSpec>& scanSpec,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+ const std::unordered_map<std::string, FileColumnHandlePtr>* infoColumns,
+ std::vector<column_index_t> bucketChannels = {},
+#endif
+ const common::SubfieldFilters* subfieldFiltersForValidation = nullptr);
+
+ void prepareSplit(
+ std::shared_ptr<common::MetadataFilter> metadataFilter,
+ dwio::common::RuntimeStatistics& runtimeStats,
+ const folly::F14FastMap<std::string, std::string>& fileReadOps = {})
override;
+
+ uint64_t next(uint64_t size, VectorPtr& output) override;
+
+ private:
+ /// Validate that file statistics are consistent with deletion vector.
+ /// Per Delta spec: numRecords is required when DV is present.
+ /// Also validates that cardinality doesn't exceed numRecords.
+ void validateStatisticsForDeletionVectors(const DeltaFileStatistics& stats,
const DeltaDeletionVectorDescriptor& dv);
+
+ // Delta deletion vectors use file-global row positions, not split-relative
+ // row numbers.
+ uint64_t baseReadRowNumber_;
+ std::unique_ptr<DeltaDeletionVectorReader> deletionVectorReader_;
+ BufferPtr deleteBitmap_;
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/tests/CMakeLists.txt
b/cpp/velox/compute/delta/tests/CMakeLists.txt
index 9641cc341c..eabe94b722 100644
--- a/cpp/velox/compute/delta/tests/CMakeLists.txt
+++ b/cpp/velox/compute/delta/tests/CMakeLists.txt
@@ -23,3 +23,15 @@ add_test(
NAME velox_roaring_bitmap_array_test
COMMAND velox_roaring_bitmap_array_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+
+add_executable(
+ velox_delta_read_test DeltaConnectorTest.cpp
+ DeltaDeletionVectorReaderTest.cpp DeltaSplitTest.cpp)
+
+target_link_libraries(velox_delta_read_test velox roaring
+ facebook::velox::exec_test_lib GTest::gtest)
+
+add_test(
+ NAME velox_delta_read_test
+ COMMAND velox_delta_read_test
+ WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
diff --git a/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp
b/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp
new file mode 100644
index 0000000000..3669f911d9
--- /dev/null
+++ b/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "compute/delta/DeltaConnector.h"
+#include "compute/delta/DeltaSplit.h"
+#include "compute/delta/RoaringBitmapArray.h"
+#include "folly/init/Init.h"
+#include "velox/connectors/Connector.h"
+#include "velox/connectors/hive/HiveConfig.h"
+#include "velox/exec/tests/utils/AssertQueryBuilder.h"
+#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
+#include "velox/exec/tests/utils/PlanBuilder.h"
+
+#include <limits>
+
+namespace gluten::delta {
+
+namespace {
+
+class DeltaConnectorTest : public ::testing::Test {
+ protected:
+ static constexpr const char* kConnectorId = "test-delta";
+
+ void TearDown() override {
+ unregisterConnector(kConnectorId);
+ }
+
+ void registerDeltaConnector(
+ std::shared_ptr<const config::ConfigBase> config =
+ std::make_shared<config::ConfigBase>(std::unordered_map<std::string,
std::string>{})) {
+ unregisterConnector(kConnectorId);
+
+ DeltaConnectorFactory factory;
+ registerConnector(factory.newConnector(kConnectorId, std::move(config)));
+ }
+};
+
+TEST_F(DeltaConnectorTest, connectorConfiguration) {
+ auto customConfig =
std::make_shared<config::ConfigBase>(std::unordered_map<std::string,
std::string>{
+ {hive::HiveConfig::kEnableFileHandleCache, "true"},
{hive::HiveConfig::kNumCacheFileHandles, "1000"}});
+
+ registerDeltaConnector(customConfig);
+
+ auto deltaConnector = getConnector(kConnectorId);
+ ASSERT_NE(deltaConnector, nullptr);
+
+ hive::HiveConfig hiveConfig(deltaConnector->connectorConfig());
+ ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled());
+ ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000);
+}
+
+TEST_F(DeltaConnectorTest, connectorProperties) {
+ registerDeltaConnector();
+
+ auto deltaConnector = getConnector(kConnectorId);
+ ASSERT_NE(deltaConnector, nullptr);
+ ASSERT_TRUE(deltaConnector->canAddDynamicFilter());
+ ASSERT_TRUE(deltaConnector->supportsSplitPreload());
+}
+
+class DeltaConnectorExecutionTest : public
facebook::velox::exec::test::HiveConnectorTestBase {
+ protected:
+ static constexpr const char* kConnectorId = "test-delta";
+
+ void SetUp() override {
+ facebook::velox::exec::test::HiveConnectorTestBase::SetUp();
+ registerDeltaConnector();
+ }
+
+ void TearDown() override {
+ unregisterConnector(kConnectorId);
+ facebook::velox::exec::test::HiveConnectorTestBase::TearDown();
+ }
+
+ void registerDeltaConnector(
+ std::shared_ptr<const config::ConfigBase> config =
+ std::make_shared<config::ConfigBase>(std::unordered_map<std::string,
std::string>{})) {
+ unregisterConnector(kConnectorId);
+
+ DeltaConnectorFactory factory;
+ registerConnector(factory.newConnector(kConnectorId, std::move(config)));
+ }
+
+ std::string createSerializedPayload(const std::vector<int64_t>& deletedRows)
{
+ RoaringBitmapArray bitmap;
+ for (auto row : deletedRows) {
+ bitmap.addSafe(row);
+ }
+
+ const auto serializedSize = bitmap.serializedSizeInBytes();
+ auto buffer = AlignedBuffer::allocate<char>(serializedSize, pool());
+ bitmap.serialize(buffer->asMutable<char>());
+ return std::string(buffer->as<char>(), serializedSize);
+ }
+
+ std::shared_ptr<HiveDeltaSplit>
+ makeDeltaSplit(const std::string& filePath, const std::string&
serializedPayload, uint64_t cardinality) {
+ SplitPayloadBufferView payloadView{
+ reinterpret_cast<const uint8_t*>(serializedPayload.data()),
static_cast<int32_t>(serializedPayload.size())};
+
+ return std::make_shared<HiveDeltaSplit>(
+ kConnectorId,
+ filePath,
+ facebook::velox::dwio::common::FileFormat::DWRF,
+ 0,
+ std::numeric_limits<uint64_t>::max(),
+ std::unordered_map<std::string, std::optional<std::string>>{},
+ std::nullopt,
+ std::unordered_map<std::string, std::string>{{"table_format",
"delta"}},
+ nullptr,
+ std::unordered_map<std::string, std::string>{},
+ true,
+ DeltaDeletionVectorDescriptor::serialized(cardinality, payloadView));
+ }
+};
+
+TEST_F(DeltaConnectorExecutionTest,
filtersRowsUsingMaterializedDeletionVector) {
+ const auto rowType = ROW({"id"}, {BIGINT()});
+ const auto input = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 11,
12, 13, 14, 15, 16, 17, 18, 19})});
+ const auto file = facebook::velox::exec::test::TempFilePath::create();
+ writeToFile(file->getPath(), input);
+
+ const auto plan = facebook::velox::exec::test::PlanBuilder(pool())
+ .startTableScan()
+ .connectorId(kConnectorId)
+ .outputType(rowType)
+ .endTableScan()
+ .planNode();
+
+ const auto payload = createSerializedPayload({2, 5, 8});
+ const auto split = makeDeltaSplit(file->getPath(), payload, 3);
+ const auto expected = makeRowVector({"id"}, {makeFlatVector<int64_t>({10,
11, 13, 14, 16, 17, 19})});
+
+
facebook::velox::exec::test::AssertQueryBuilder(plan).split(split).assertResults(expected);
+
+ const auto nonMatchingPayload = createSerializedPayload({42});
+ const auto nonMatchingSplit = makeDeltaSplit(file->getPath(),
nonMatchingPayload, 1);
+
facebook::velox::exec::test::AssertQueryBuilder(plan).split(nonMatchingSplit).assertResults(input);
+}
+
+} // namespace
+
+} // namespace gluten::delta
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ folly::Init init(&argc, &argv, false);
+ return RUN_ALL_TESTS();
+}
diff --git a/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp
b/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp
new file mode 100644
index 0000000000..f214e427e1
--- /dev/null
+++ b/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaDeletionVectorReader.h"
+#include "compute/delta/RoaringBitmapArray.h"
+#include "velox/common/base/tests/GTestUtils.h"
+
+#include <gtest/gtest.h>
+
+using namespace facebook::velox;
+using namespace gluten::delta;
+
+class DeltaDeletionVectorReaderTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
+ }
+
+ void SetUp() override {
+ pool_ = memory::memoryManager()->addLeafPool();
+ }
+
+ std::string createSerializedPayload(const std::vector<int64_t>& deletedRows)
{
+ RoaringBitmapArray bitmap;
+ for (auto row : deletedRows) {
+ bitmap.addSafe(row);
+ }
+
+ const auto serializedSize = bitmap.serializedSizeInBytes();
+ auto buffer = AlignedBuffer::allocate<char>(serializedSize, pool_.get());
+ bitmap.serialize(buffer->asMutable<char>());
+ return std::string(buffer->as<char>(), serializedSize);
+ }
+
+ std::shared_ptr<memory::MemoryPool> pool_;
+};
+
+TEST_F(DeltaDeletionVectorReaderTest, LoadSerializedPayload) {
+ auto payload = createSerializedPayload({2, 7, 12});
+
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(payload, 3);
+
+ EXPECT_TRUE(reader.isRowDeleted(2));
+ EXPECT_TRUE(reader.isRowDeleted(7));
+ EXPECT_TRUE(reader.isRowDeleted(12));
+ EXPECT_FALSE(reader.isRowDeleted(0));
+ EXPECT_FALSE(reader.isRowDeleted(3));
+ EXPECT_FALSE(reader.isRowDeleted(20));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, LoadPortablePayload) {
+ // Captured from a Delta 3.3.2 table after `DELETE WHERE id < 10`.
+ const std::vector<uint8_t> payloadBytes = {0xd1, 0xd3, 0x39, 0x64, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00,
0x3b, 0x30, 0x00, 0x00, 0x01, 0x00,
+ 0x00, 0x09, 0x00, 0x01, 0x00,
0x00, 0x00, 0x09, 0x00};
+
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(
+ std::string_view(reinterpret_cast<const char*>(payloadBytes.data()),
payloadBytes.size()), 10);
+
+ for (uint64_t deleted = 0; deleted < 10; ++deleted) {
+ EXPECT_TRUE(reader.isRowDeleted(deleted));
+ }
+ EXPECT_FALSE(reader.isRowDeleted(10));
+ EXPECT_FALSE(reader.isRowDeleted(100));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, ApplyDeletionFilter) {
+ auto payload = createSerializedPayload({2, 5, 8});
+
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(payload);
+
+ auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(10),
pool_.get());
+ reader.applyDeletionFilter(0, 10, deleteBitmap);
+
+ auto* rawBitmap = deleteBitmap->as<uint64_t>();
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, 2));
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, 5));
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, 8));
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, 1));
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, 7));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, ApplyDeletionFilterWithOffset) {
+ auto payload = createSerializedPayload({10, 15, 20});
+
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(payload);
+
+ auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(15),
pool_.get());
+ reader.applyDeletionFilter(10, 15, deleteBitmap);
+
+ auto* rawBitmap = deleteBitmap->as<uint64_t>();
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, 0));
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, 5));
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, 10));
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, 1));
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, 14));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, EmptyReader) {
+ DeltaDeletionVectorReader reader;
+
+ EXPECT_TRUE(reader.empty());
+ EXPECT_FALSE(reader.isRowDeleted(0));
+
+ auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(10),
pool_.get());
+ reader.applyDeletionFilter(0, 10, deleteBitmap);
+
+ auto* rawBitmap = deleteBitmap->as<uint64_t>();
+ for (int i = 0; i < 10; ++i) {
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, i));
+ }
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, MultipleLoadsOverwrite) {
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(createSerializedPayload({1, 2, 3}));
+ EXPECT_TRUE(reader.isRowDeleted(1));
+ EXPECT_FALSE(reader.isRowDeleted(10));
+
+ reader.loadSerializedDeletionVector(createSerializedPayload({10, 20, 30}));
+ EXPECT_FALSE(reader.isRowDeleted(1));
+ EXPECT_TRUE(reader.isRowDeleted(10));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, EmptyPayloadThrows) {
+ DeltaDeletionVectorReader reader;
+ VELOX_ASSERT_THROW(reader.loadSerializedDeletionVector(std::string_view()),
"Serialized deletion vector is empty");
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, CorruptedMagicNumberThrows) {
+ const uint32_t wrongMagic = 12345678;
+ const std::string payload(reinterpret_cast<const char*>(&wrongMagic),
sizeof(wrongMagic));
+
+ DeltaDeletionVectorReader reader;
+ VELOX_ASSERT_THROW(reader.loadSerializedDeletionVector(payload), "Unexpected
Delta bitmap array magic number");
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationSuccess) {
+ auto payload = createSerializedPayload({1, 2, 3, 4, 5});
+
+ DeltaDeletionVectorReader reader;
+ EXPECT_NO_THROW(reader.loadSerializedDeletionVector(payload, 5));
+ EXPECT_EQ(reader.estimatedDeletedRowCount(), 5);
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationMismatchThrows) {
+ auto payload = createSerializedPayload({1, 2, 3, 4, 5});
+
+ DeltaDeletionVectorReader reader;
+ EXPECT_THROW(reader.loadSerializedDeletionVector(payload, 3),
VeloxUserError);
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, LargeCardinalityValidation) {
+ std::vector<int64_t> deletedRows;
+ for (int64_t i = 0; i < 10000; i += 10) {
+ deletedRows.push_back(i);
+ }
+
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(createSerializedPayload(deletedRows),
1000);
+ EXPECT_EQ(reader.estimatedDeletedRowCount(), 1000);
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, BatchFilteringPartialOverlap) {
+ std::vector<int64_t> deletedRows;
+ for (int64_t i = 45; i <= 55; ++i) {
+ deletedRows.push_back(i);
+ }
+
+ DeltaDeletionVectorReader reader;
+ reader.loadSerializedDeletionVector(createSerializedPayload(deletedRows));
+
+ auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(20),
pool_.get());
+ reader.applyDeletionFilter(40, 20, deleteBitmap);
+
+ auto* rawBitmap = deleteBitmap->as<uint64_t>();
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, i));
+ }
+ for (int i = 5; i <= 15; ++i) {
+ EXPECT_TRUE(bits::isBitSet(rawBitmap, i));
+ }
+ for (int i = 16; i < 20; ++i) {
+ EXPECT_FALSE(bits::isBitSet(rawBitmap, i));
+ }
+}
diff --git a/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp
b/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp
new file mode 100644
index 0000000000..c081b278e2
--- /dev/null
+++ b/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "compute/delta/DeltaSplit.h"
+
+using namespace gluten::delta;
+
+TEST(DeltaSplitTest, DescriptorCarriesPayloadView) {
+ const std::string payload = "payload";
+ gluten::SplitPayloadBufferView payloadView{
+ reinterpret_cast<const uint8_t*>(payload.data()),
static_cast<int32_t>(payload.size())};
+
+ auto descriptor = DeltaDeletionVectorDescriptor::serialized(3, payloadView);
+
+ ASSERT_TRUE(descriptor.serializedPayloadView.has_value());
+ EXPECT_EQ(descriptor.serializedPayloadView->size, payload.size());
+ EXPECT_EQ(descriptor.cardinality, 3);
+ EXPECT_TRUE(descriptor.hasMaterializedPayload());
+}
+
+TEST(DeltaSplitTest, SplitCarriesDeletionVectorDescriptor) {
+ const std::string payload = "serialized";
+ gluten::SplitPayloadBufferView payloadView{
+ reinterpret_cast<const uint8_t*>(payload.data()),
static_cast<int32_t>(payload.size())};
+ auto descriptor = DeltaDeletionVectorDescriptor::serialized(2, payloadView);
+
+ auto split = std::make_shared<HiveDeltaSplit>(
+ "test-delta",
+ "/tmp/data.parquet",
+ facebook::velox::dwio::common::FileFormat::PARQUET,
+ 0,
+ 1024,
+ std::unordered_map<std::string, std::optional<std::string>>{},
+ std::nullopt,
+ std::unordered_map<std::string, std::string>{{"table_format", "delta"}},
+ nullptr,
+ std::unordered_map<std::string, std::string>{},
+ true,
+ descriptor,
+ std::nullopt,
+ DeltaRowIndexFilterType::kIfContained,
+ std::unordered_map<std::string, std::string>{},
+ std::nullopt);
+
+ ASSERT_TRUE(split->deletionVector.has_value());
+ EXPECT_EQ(split->deletionVector->cardinality, 2);
+ ASSERT_TRUE(split->deletionVector->serializedPayloadView.has_value());
+ EXPECT_EQ(split->deletionVector->serializedPayloadView->size,
payload.size());
+ EXPECT_EQ(split->filterType, DeltaRowIndexFilterType::kIfContained);
+}
+
+TEST(DeltaSplitTest, LogicalRowCountSubtractsDeletionVectorCardinality) {
+ DeltaFileStatistics stats{.numRecords = 10, .tightBounds = true};
+ auto descriptor = DeltaDeletionVectorDescriptor::serialized(3);
+
+ EXPECT_EQ(stats.logicalRowCount(descriptor), 7);
+}
+
+TEST(DeltaSplitTest, LogicalRowCountPreservesUnknownCounts) {
+ DeltaFileStatistics stats{.numRecords = std::nullopt, .tightBounds =
std::nullopt};
+ auto descriptor = DeltaDeletionVectorDescriptor::serialized(3);
+
+ EXPECT_EQ(stats.logicalRowCount(descriptor), -1);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]