This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 306de370d4 [GLUTEN-9456][VL] Add custom direct buffered input (#11452)
306de370d4 is described below

commit 306de370d403d5bd58ba4092675c004656ab9095
Author: Rui Mo <[email protected]>
AuthorDate: Fri Jan 30 09:26:40 2026 +0000

    [GLUTEN-9456][VL] Add custom direct buffered input (#11452)
---
 cpp/velox/compute/VeloxBackend.cc             |  5 ++-
 cpp/velox/memory/GlutenBufferedInputBuilder.h | 65 +++++++++++++++++++++++++++
 cpp/velox/memory/GlutenDirectBufferedInput.h  | 65 +++++++++++++++++++++++++++
 3 files changed, 134 insertions(+), 1 deletion(-)

diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index a99cac823b..75fd84b104 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -28,21 +28,23 @@
 #include "utils/qat/QatCodec.h"
 #endif
 #ifdef GLUTEN_ENABLE_GPU
+#include "operators/plannodes/CudfVectorStream.h"
 #include "velox/experimental/cudf/CudfConfig.h"
 #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
 #include "velox/experimental/cudf/exec/ToCudf.h"
-#include "operators/plannodes/CudfVectorStream.h"
 #endif
 
 #include "compute/VeloxRuntime.h"
 #include "config/VeloxConfig.h"
 #include "jni/JniFileSystem.h"
+#include "memory/GlutenBufferedInputBuilder.h"
 #include "operators/functions/SparkExprToSubfieldFilterParser.h"
 #include "shuffle/ArrowShuffleDictionaryWriter.h"
 #include "udf/UdfLoader.h"
 #include "utils/Exception.h"
 #include "velox/common/caching/SsdCache.h"
 #include "velox/common/file/FileSystems.h"
+#include "velox/connectors/hive/BufferedInputBuilder.h"
 #include "velox/connectors/hive/HiveConnector.h"
 #include "velox/connectors/hive/HiveDataSource.h"
 #include 
"velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // 
@manual
@@ -190,6 +192,7 @@ void VeloxBackend::init(
   velox::parquet::registerParquetWriterFactory();
   velox::orc::registerOrcReaderFactory();
   
velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique<SparkExprToSubfieldFilterParser>());
+  
velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared<GlutenBufferedInputBuilder>());
 
   // Register Velox functions
   registerAllFunctions();
diff --git a/cpp/velox/memory/GlutenBufferedInputBuilder.h 
b/cpp/velox/memory/GlutenBufferedInputBuilder.h
new file mode 100644
index 0000000000..86116ff1e8
--- /dev/null
+++ b/cpp/velox/memory/GlutenBufferedInputBuilder.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "GlutenDirectBufferedInput.h"
+#include "velox/connectors/hive/BufferedInputBuilder.h"
+#include "velox/connectors/hive/FileHandle.h"
+#include "velox/dwio/common/CachedBufferedInput.h"
+
+namespace gluten {
+
+class GlutenBufferedInputBuilder : public 
facebook::velox::connector::hive::BufferedInputBuilder {
+ public:
+  std::unique_ptr<facebook::velox::dwio::common::BufferedInput> create(
+      const facebook::velox::FileHandle& fileHandle,
+      const facebook::velox::dwio::common::ReaderOptions& readerOpts,
+      const facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
+      std::shared_ptr<facebook::velox::io::IoStatistics> ioStats,
+      std::shared_ptr<facebook::velox::filesystems::File::IoStats> fsStats,
+      folly::Executor* executor,
+      const folly::F14FastMap<std::string, std::string>& fileReadOps = {}) 
override {
+    if (connectorQueryCtx->cache()) {
+      return 
std::make_unique<facebook::velox::dwio::common::CachedBufferedInput>(
+          fileHandle.file,
+          dwio::common::MetricsLog::voidLog(),
+          fileHandle.uuid,
+          connectorQueryCtx->cache(),
+          
facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(), 
readerOpts.loadQuantum()),
+          fileHandle.groupId,
+          ioStats,
+          std::move(fsStats),
+          executor,
+          readerOpts,
+          fileReadOps);
+    }
+    return std::make_unique<GlutenDirectBufferedInput>(
+        fileHandle.file,
+        dwio::common::MetricsLog::voidLog(),
+        fileHandle.uuid,
+        
facebook::velox::connector::Connector::getTracker(connectorQueryCtx->scanId(), 
readerOpts.loadQuantum()),
+        fileHandle.groupId,
+        std::move(ioStats),
+        std::move(fsStats),
+        executor,
+        readerOpts,
+        fileReadOps);
+  }
+};
+
+} // namespace gluten
diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.h 
b/cpp/velox/memory/GlutenDirectBufferedInput.h
new file mode 100644
index 0000000000..edaff5c603
--- /dev/null
+++ b/cpp/velox/memory/GlutenDirectBufferedInput.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/dwio/common/DirectBufferedInput.h"
+
+namespace gluten {
+
+class GlutenDirectBufferedInput : public 
facebook::velox::dwio::common::DirectBufferedInput {
+ public:
+  GlutenDirectBufferedInput(
+      std::shared_ptr<facebook::velox::ReadFile> readFile,
+      const facebook::velox::dwio::common::MetricsLogPtr& metricsLog,
+      facebook::velox::StringIdLease fileNum,
+      std::shared_ptr<facebook::velox::cache::ScanTracker> tracker,
+      facebook::velox::StringIdLease groupId,
+      std::shared_ptr<facebook::velox::io::IoStatistics> ioStats,
+      std::shared_ptr<facebook::velox::filesystems::File::IoStats> fsStats,
+      folly::Executor* executor,
+      const facebook::velox::io::ReaderOptions& readerOptions,
+      folly::F14FastMap<std::string, std::string> fileReadOps = {})
+      : DirectBufferedInput(
+            std::move(readFile),
+            metricsLog,
+            std::move(fileNum),
+            std::move(tracker),
+            std::move(groupId),
+            std::move(ioStats),
+            std::move(fsStats),
+            executor,
+            readerOptions,
+            std::move(fileReadOps)) {}
+
+  ~GlutenDirectBufferedInput() override {
+    requests_.clear();
+    for (auto& load : coalescedLoads_) {
+      if (load->state() == 
facebook::velox::cache::CoalescedLoad::State::kLoading) {
+        folly::SemiFuture<bool> waitFuture(false);
+        if (!load->loadOrFuture(&waitFuture)) {
+          auto& exec = folly::QueuedImmediateExecutor::instance();
+          std::move(waitFuture).via(&exec).wait();
+        }
+      }
+      load->cancel();
+    }
+    coalescedLoads_.clear();
+  }
+};
+
+} // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to