This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bf3d8d723f [GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3 
(#6172)
bf3d8d723f is described below

commit bf3d8d723f89b8801cc0ba2a5018c87eb1b73b3d
Author: JiaKe <[email protected]>
AuthorDate: Fri Oct 25 15:17:25 2024 +0800

    [GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3 (#6172)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  4 +++-
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |  2 +-
 .../GlutenClickHouseMergeTreeWriteSuite.scala      |  4 ++--
 .../backendsapi/velox/VeloxIteratorApi.scala       | 20 ++++++++++++----
 .../gluten/utils/SharedLibraryLoaderCentos7.scala  |  2 +-
 .../gluten/utils/SharedLibraryLoaderCentos8.scala  |  2 +-
 .../gluten/utils/SharedLibraryLoaderCentos9.scala  |  2 +-
 .../gluten/utils/SharedLibraryLoaderDebian11.scala |  2 +-
 .../gluten/utils/SharedLibraryLoaderDebian12.scala |  2 +-
 .../utils/SharedLibraryLoaderUbuntu2004.scala      |  2 +-
 .../utils/SharedLibraryLoaderUbuntu2204.scala      |  2 +-
 cpp/velox/CMakeLists.txt                           | 28 ----------------------
 cpp/velox/compute/WholeStageResultIterator.cc      |  7 ------
 cpp/velox/utils/HdfsUtils.h                        | 22 -----------------
 ep/build-velox/src/get_velox.sh                    |  4 ++--
 ep/build-velox/src/modify_velox.patch              | 22 +----------------
 .../gluten/execution/IcebergScanTransformer.scala  |  5 +++-
 .../gluten/execution/VeloxIcebergSuite.scala       |  6 ++---
 .../apache/gluten/backendsapi/IteratorApi.scala    |  4 +++-
 .../execution/BasicScanExecTransformer.scala       | 12 ++++++----
 .../gluten/execution/WholeStageTransformer.scala   | 15 +++++++++---
 21 files changed, 61 insertions(+), 108 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 6760d29561..2fa2e4402b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 
 import java.lang.{Long => JLong}
 import java.net.URI
@@ -133,7 +134,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo = {
+      properties: Map[String, String],
+      serializableHadoopConf: SerializableConfiguration): SplitInfo = {
     partition match {
       case p: GlutenMergeTreePartition =>
         ExtensionTableBuilder
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index dfdf02d455..af67b01f49 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
             case scanExec: BasicScanExecTransformer => scanExec
           }
           assertResult(1)(plans.size)
-          assertResult(1)(plans.head.getSplitInfos.size)
+          assertResult(1)(plans.head.getSplitInfos(null).size)
       }
     }
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 186078c18d..0959832949 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1801,7 +1801,7 @@ class GlutenClickHouseMergeTreeWriteSuite
                 case scanExec: BasicScanExecTransformer => scanExec
               }
               assertResult(1)(plans.size)
-              assertResult(conf._2)(plans.head.getSplitInfos.size)
+              assertResult(conf._2)(plans.head.getSplitInfos(null).size)
           }
         }
       })
@@ -1910,7 +1910,7 @@ class GlutenClickHouseMergeTreeWriteSuite
                 case f: BasicScanExecTransformer => f
               }
               assertResult(2)(scanExec.size)
-              assertResult(conf._2)(scanExec(1).getSplitInfos.size)
+              assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
           }
         }
       })
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 061daaac0f..320d1f366c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
+import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, 
SparkDirectoryUtil}
+
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import java.lang.{Long => JLong}
 import java.nio.charset.StandardCharsets
@@ -55,7 +57,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo = {
+      properties: Map[String, String],
+      serializableHadoopConf: SerializableConfiguration): SplitInfo = {
     partition match {
       case f: FilePartition =>
         val (
@@ -66,7 +69,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
           modificationTimes,
           partitionColumns,
           metadataColumns) =
-          constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
+          constructSplitInfo(partitionSchema, f.files, metadataColumnNames, 
serializableHadoopConf)
         val preferredLocations =
           SoftAffinity.getFilePartitionLocations(f)
         LocalFilesBuilder.makeLocalFiles(
@@ -109,7 +112,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   private def constructSplitInfo(
       schema: StructType,
       files: Array[PartitionedFile],
-      metadataColumnNames: Seq[String]) = {
+      metadataColumnNames: Seq[String],
+      serializableHadoopConf: SerializableConfiguration) = {
     val paths = new JArrayList[String]()
     val starts = new JArrayList[JLong]
     val lengths = new JArrayList[JLong]()
@@ -121,9 +125,15 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       file =>
         // The "file.filePath" in PartitionedFile is not the original encoded 
path, so the decoded
         // path is incorrect in some cases and here fix the case of ' ' by 
using GlutenURLDecoder
+        var filePath = file.filePath.toString
+        if (filePath.startsWith("viewfs")) {
+          val viewPath = new Path(filePath)
+          val viewFileSystem = FileSystem.get(viewPath.toUri, 
serializableHadoopConf.value)
+          filePath = viewFileSystem.resolvePath(viewPath).toString
+        }
         paths.add(
           GlutenURLDecoder
-            .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
+            .decode(filePath, StandardCharsets.UTF_8.name()))
         starts.add(JLong.valueOf(file.start))
         lengths.add(JLong.valueOf(file.length))
         val (fileSize, modificationTime) =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
index a5e638d9be..16cd18e41a 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
@@ -37,7 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader {
     loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.10", "libre2.so", false)
     loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false)
     loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
index 5d8c18b8bb..0a75c30c22 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader {
     loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.0", "libre2.so", false)
     loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
index 694cf4c622..50f9fe4aaa 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader {
     loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
     loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
index 6927f2539e..06c065ba28 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
@@ -44,6 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader 
{
     loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
     loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
index ce01f4399f..8018995328 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
@@ -50,6 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader 
{
     loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false)
     loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
index 79c0518ea3..d1f21a0013 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
@@ -57,7 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends 
SharedLibraryLoader {
     loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false)
     loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false)
     loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.5", "libre2.so", false)
     loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
     loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
index a5d99ede42..3cf4d30237 100755
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends 
SharedLibraryLoader {
     loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
     loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
     loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
-    loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
+    loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
     loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
     loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
     loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false)
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 7eaf5f1e61..47e4ff8f13 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -109,28 +109,6 @@ macro(add_duckdb)
   endif()
 endmacro()
 
-macro(find_libhdfs3)
-  find_package(libhdfs3 CONFIG)
-  if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
-    set(LIBHDFS3_LIBRARY HDFS::hdfs3)
-  else()
-    find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h)
-    set(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
-    find_library(libhdfs3_LIBRARY NAMES hdfs3)
-    find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR
-                                      libhdfs3_LIBRARY)
-    add_library(HDFS::hdfs3 SHARED IMPORTED)
-    set_target_properties(
-      HDFS::hdfs3
-      PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}"
-                 IMPORTED_LOCATION "${libhdfs3_LIBRARY}")
-  endif()
-
-  if(NOT libhdfs3_FOUND)
-    message(FATAL_ERROR "LIBHDFS3 Library Not Found")
-  endif()
-endmacro()
-
 macro(find_re2)
   find_package(re2 CONFIG)
   if(re2_FOUND AND TARGET re2::re2)
@@ -209,10 +187,6 @@ set(VELOX_SRCS
     utils/Common.cc
     utils/VeloxBatchResizer.cc)
 
-if(ENABLE_HDFS)
-  list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
-endif()
-
 if(ENABLE_S3)
   find_package(ZLIB)
 endif()
@@ -336,8 +310,6 @@ endif()
 
 if(ENABLE_HDFS)
   add_definitions(-DENABLE_HDFS)
-  find_libhdfs3()
-  target_link_libraries(velox PUBLIC HDFS::hdfs3)
 endif()
 
 if(ENABLE_S3)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index ea5bd59794..0e1a9bed7b 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -22,10 +22,6 @@
 #include "velox/connectors/hive/HiveConnectorSplit.h"
 #include "velox/exec/PlanNodeStats.h"
 
-#ifdef ENABLE_HDFS
-#include "utils/HdfsUtils.h"
-#endif
-
 using namespace facebook;
 
 namespace gluten {
@@ -70,9 +66,6 @@ WholeStageResultIterator::WholeStageResultIterator(
       scanNodeIds_(scanNodeIds),
       scanInfos_(scanInfos),
       streamIds_(streamIds) {
-#ifdef ENABLE_HDFS
-  gluten::updateHdfsTokens(veloxCfg_.get());
-#endif
   spillStrategy_ = veloxCfg_->get<std::string>(kSpillStrategy, 
kSpillStrategyDefaultValue);
   auto spillThreadNum = veloxCfg_->get<uint32_t>(kSpillThreadNum, 
kSpillThreadNumDefaultValue);
   if (spillThreadNum > 0) {
diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h
deleted file mode 100644
index 2e07d7ddf4..0000000000
--- a/cpp/velox/utils/HdfsUtils.h
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <velox/common/config/Config.h>
-#include <memory>
-namespace gluten {
-void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg);
-}
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index 87d425b5d0..95811a8c26 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -16,8 +16,8 @@
 
 set -exu
 
-VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_10_24
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=libhdfs-test
 VELOX_HOME=""
 
 OS=`uname -s`
diff --git a/ep/build-velox/src/modify_velox.patch 
b/ep/build-velox/src/modify_velox.patch
index 5c1aab2489..3d45ff4b48 100644
--- a/ep/build-velox/src/modify_velox.patch
+++ b/ep/build-velox/src/modify_velox.patch
@@ -74,27 +74,7 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt
 index 7f7cbc92f..52adb1250 100644
 --- a/CMakeLists.txt
 +++ b/CMakeLists.txt
-@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS)
- endif()
- 
- if(VELOX_ENABLE_HDFS)
--  find_library(
--    LIBHDFS3
--    NAMES libhdfs3.so libhdfs3.dylib
--    HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
-+  find_package(libhdfs3)
-+  if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
-+    set(LIBHDFS3 HDFS::hdfs3)
-+  else()
-+    find_library(
-+      LIBHDFS3
-+      NAMES libhdfs3.so libhdfs3.dylib
-+      HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
-+  endif()
-   add_definitions(-DVELOX_ENABLE_HDFS3)
- endif()
- 
-@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS 
${BOOST_INCLUDE_LIBRARIES})
+@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS 
${BOOST_INCLUDE_LIBRARIES})
  # for reference. find_package(range-v3)
  
  set_source(gflags)
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 1cbeb52a92..10d24c317c 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil
 
@@ -58,7 +59,9 @@ case class IcebergScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
-  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(
+      partitions: Seq[InputPartition],
+      serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
     val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
       scan,
       keyGroupedPartitioning,
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index bb604f534f..5ebf8883c6 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
3)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 7)
@@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
3)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 7)
@@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
1)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 5)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 11211bd0da..69c9d37334 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 
 trait IteratorApi {
 
@@ -37,7 +38,8 @@ trait IteratorApi {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo
+      properties: Map[String, String],
+      serializableHadoopConf: SerializableConfiguration): SplitInfo
 
   /** Generate native row partition. */
   def genPartitions(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 912b93079f..d7b824b397 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 import com.google.protobuf.StringValue
 import io.substrait.proto.NamedStruct
@@ -62,11 +63,13 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def getProperties: Map[String, String] = Map.empty
 
   /** Returns the split infos that will be processed by the underlying native 
engine. */
-  def getSplitInfos: Seq[SplitInfo] = {
-    getSplitInfosFromPartitions(getPartitions)
+  def getSplitInfos(serializableHadoopConf: SerializableConfiguration): 
Seq[SplitInfo] = {
+    getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
   }
 
-  def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  def getSplitInfosFromPartitions(
+      partitions: Seq[InputPartition],
+      serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
     partitions.map(
       BackendsApiManager.getIteratorApiInstance
         .genSplitInfo(
@@ -74,7 +77,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
           getPartitionSchema,
           fileFormat,
           getMetadataColumns.map(_.name),
-          getProperties))
+          getProperties,
+          serializableHadoopConf))
   }
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 7bd84e09d1..e1dfd3f570 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -40,6 +40,7 @@ import 
org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 
 import com.google.common.collect.Lists
 
@@ -129,6 +130,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)
 
   val sparkConf: SparkConf = sparkContext.getConf
+  val serializableHadoopConf: SerializableConfiguration = new 
SerializableConfiguration(
+    sparkContext.hadoopConfiguration)
   val numaBindingInfo: GlutenNumaBindingInfo = 
GlutenConfig.getConf.numaBindingInfo
 
   @transient
@@ -289,12 +292,16 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
        */
       val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
       val allScanSplitInfos =
-        getSplitInfosFromPartitions(basicScanExecTransformers, 
allScanPartitions)
+        getSplitInfosFromPartitions(
+          basicScanExecTransformers,
+          allScanPartitions,
+          serializableHadoopConf)
       val inputPartitions =
         BackendsApiManager.getIteratorApiInstance.genPartitions(
           wsCtx,
           allScanSplitInfos,
           basicScanExecTransformers)
+
       val rdd = new GlutenWholeStageColumnarRDD(
         sparkContext,
         inputPartitions,
@@ -380,7 +387,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
   private def getSplitInfosFromPartitions(
       basicScanExecTransformers: Seq[BasicScanExecTransformer],
-      allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
+      allScanPartitions: Seq[Seq[InputPartition]],
+      serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] 
= {
     // If these are two scan transformers, they must have same partitions,
     // otherwise, exchange will be inserted. We should combine the two scan
     // transformers' partitions with same index, and set them together in
@@ -398,7 +406,8 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     //  p1n  |  p2n    => substraitContext.setSplitInfo([p1n, p2n])
     val allScanSplitInfos =
       allScanPartitions.zip(basicScanExecTransformers).map {
-        case (partition, transformer) => 
transformer.getSplitInfosFromPartitions(partition)
+        case (partition, transformer) =>
+          transformer.getSplitInfosFromPartitions(partition, 
serializableHadoopConf)
       }
     val partitionLength = allScanSplitInfos.head.size
     if (allScanSplitInfos.exists(_.size != partitionLength)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to