This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 306de370d4 [GLUTEN-9456][VL] Add custom direct buffered input (#11452)
306de370d4 is described below
commit 306de370d403d5bd58ba4092675c004656ab9095
Author: Rui Mo <[email protected]>
AuthorDate: Fri Jan 30 09:26:40 2026 +0000
[GLUTEN-9456][VL] Add custom direct buffered input (#11452)
---
cpp/velox/compute/VeloxBackend.cc | 5 ++-
cpp/velox/memory/GlutenBufferedInputBuilder.h | 65 +++++++++++++++++++++++++++
cpp/velox/memory/GlutenDirectBufferedInput.h | 65 +++++++++++++++++++++++++++
3 files changed, 134 insertions(+), 1 deletion(-)
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index a99cac823b..75fd84b104 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -28,21 +28,23 @@
#include "utils/qat/QatCodec.h"
#endif
#ifdef GLUTEN_ENABLE_GPU
+#include "operators/plannodes/CudfVectorStream.h"
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
-#include "operators/plannodes/CudfVectorStream.h"
#endif
#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
#include "jni/JniFileSystem.h"
+#include "memory/GlutenBufferedInputBuilder.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
#include "shuffle/ArrowShuffleDictionaryWriter.h"
#include "udf/UdfLoader.h"
#include "utils/Exception.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
+#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include
"velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" //
@manual
@@ -190,6 +192,7 @@ void VeloxBackend::init(
velox::parquet::registerParquetWriterFactory();
velox::orc::registerOrcReaderFactory();
velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique<SparkExprToSubfieldFilterParser>());
+
velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared<GlutenBufferedInputBuilder>());
// Register Velox functions
registerAllFunctions();
diff --git a/cpp/velox/memory/GlutenBufferedInputBuilder.h
b/cpp/velox/memory/GlutenBufferedInputBuilder.h
new file mode 100644
index 0000000000..86116ff1e8
--- /dev/null
+++ b/cpp/velox/memory/GlutenBufferedInputBuilder.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "GlutenDirectBufferedInput.h"
+#include "velox/connectors/hive/BufferedInputBuilder.h"
+#include "velox/connectors/hive/FileHandle.h"
+#include "velox/dwio/common/CachedBufferedInput.h"
+
+namespace gluten {
+
+class GlutenBufferedInputBuilder : public
facebook::velox::connector::hive::BufferedInputBuilder {
+ public:
+ std::unique_ptr<facebook::velox::dwio::common::BufferedInput> create(
+ const facebook::velox::FileHandle& fileHandle,
+ const facebook::velox::dwio::common::ReaderOptions& readerOpts,
+ const facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
+ std::shared_ptr<facebook::velox::io::IoStatistics> ioStats,
+ std::shared_ptr<facebook::velox::filesystems::File::IoStats> fsStats,
+ folly::Executor* executor,
+ const folly::F14FastMap<std::string, std::string>& fileReadOps = {})
override {
+ if (connectorQueryCtx->cache()) {
+ return
std::make_unique<facebook::velox::dwio::common::CachedBufferedInput>(
+ fileHandle.file,
+ dwio::common::MetricsLog::voidLog(),
+ fileHandle.uuid,
+ connectorQueryCtx->cache(),
+
facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(),
readerOpts.loadQuantum()),
+ fileHandle.groupId,
+ ioStats,
+ std::move(fsStats),
+ executor,
+ readerOpts,
+ fileReadOps);
+ }
+ return std::make_unique<GlutenDirectBufferedInput>(
+ fileHandle.file,
+ dwio::common::MetricsLog::voidLog(),
+ fileHandle.uuid,
+
facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(),
readerOpts.loadQuantum()),
+ fileHandle.groupId,
+ std::move(ioStats),
+ std::move(fsStats),
+ executor,
+ readerOpts,
+ fileReadOps);
+ }
+};
+
+} // namespace gluten
diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.h
b/cpp/velox/memory/GlutenDirectBufferedInput.h
new file mode 100644
index 0000000000..edaff5c603
--- /dev/null
+++ b/cpp/velox/memory/GlutenDirectBufferedInput.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/dwio/common/DirectBufferedInput.h"
+
+namespace gluten {
+
+class GlutenDirectBufferedInput : public
facebook::velox::dwio::common::DirectBufferedInput {
+ public:
+ GlutenDirectBufferedInput(
+ std::shared_ptr<facebook::velox::ReadFile> readFile,
+ const facebook::velox::dwio::common::MetricsLogPtr& metricsLog,
+ facebook::velox::StringIdLease fileNum,
+ std::shared_ptr<facebook::velox::cache::ScanTracker> tracker,
+ facebook::velox::StringIdLease groupId,
+ std::shared_ptr<facebook::velox::io::IoStatistics> ioStats,
+ std::shared_ptr<facebook::velox::filesystems::File::IoStats> fsStats,
+ folly::Executor* executor,
+ const facebook::velox::io::ReaderOptions& readerOptions,
+ folly::F14FastMap<std::string, std::string> fileReadOps = {})
+ : DirectBufferedInput(
+ std::move(readFile),
+ metricsLog,
+ std::move(fileNum),
+ std::move(tracker),
+ std::move(groupId),
+ std::move(ioStats),
+ std::move(fsStats),
+ executor,
+ readerOptions,
+ std::move(fileReadOps)) {}
+
+ ~GlutenDirectBufferedInput() override {
+ requests_.clear();
+ for (auto& load : coalescedLoads_) {
+ if (load->state() ==
facebook::velox::cache::CoalescedLoad::State::kLoading) {
+ folly::SemiFuture<bool> waitFuture(false);
+ if (!load->loadOrFuture(&waitFuture)) {
+ auto& exec = folly::QueuedImmediateExecutor::instance();
+ std::move(waitFuture).via(&exec).wait();
+ }
+ }
+ load->cancel();
+ }
+ coalescedLoads_.clear();
+ }
+};
+
+} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]