This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3b81ffdc12 Revert "[GLUTEN-5103][VL] Use jvm libhdfs replace c++
libhdfs3 (#6172)" (#7683)
3b81ffdc12 is described below
commit 3b81ffdc1232329a78d011c3035536c2479b941e
Author: Rong Ma <[email protected]>
AuthorDate: Fri Oct 25 15:24:45 2024 +0800
Revert "[GLUTEN-5103][VL] Use jvm libhdfs replace c++ libhdfs3 (#6172)"
(#7683)
This reverts commit bf3d8d723f89b8801cc0ba2a5018c87eb1b73b3d.
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 4 +---
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 2 +-
.../GlutenClickHouseMergeTreeWriteSuite.scala | 4 ++--
.../backendsapi/velox/VeloxIteratorApi.scala | 20 ++++------------
.../gluten/utils/SharedLibraryLoaderCentos7.scala | 2 +-
.../gluten/utils/SharedLibraryLoaderCentos8.scala | 2 +-
.../gluten/utils/SharedLibraryLoaderCentos9.scala | 2 +-
.../gluten/utils/SharedLibraryLoaderDebian11.scala | 2 +-
.../gluten/utils/SharedLibraryLoaderDebian12.scala | 2 +-
.../utils/SharedLibraryLoaderUbuntu2004.scala | 2 +-
.../utils/SharedLibraryLoaderUbuntu2204.scala | 2 +-
cpp/velox/CMakeLists.txt | 28 ++++++++++++++++++++++
cpp/velox/compute/WholeStageResultIterator.cc | 7 ++++++
cpp/velox/utils/HdfsUtils.h | 22 +++++++++++++++++
ep/build-velox/src/get_velox.sh | 4 ++--
ep/build-velox/src/modify_velox.patch | 22 ++++++++++++++++-
.../gluten/execution/IcebergScanTransformer.scala | 5 +---
.../gluten/execution/VeloxIcebergSuite.scala | 6 ++---
.../apache/gluten/backendsapi/IteratorApi.scala | 4 +---
.../execution/BasicScanExecTransformer.scala | 12 ++++------
.../gluten/execution/WholeStageTransformer.scala | 15 +++---------
21 files changed, 108 insertions(+), 61 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 2fa2e4402b..6760d29561 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -44,7 +44,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
import java.lang.{Long => JLong}
import java.net.URI
@@ -134,8 +133,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String],
- serializableHadoopConf: SerializableConfiguration): SplitInfo = {
+ properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index af67b01f49..dfdf02d455 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
- assertResult(1)(plans.head.getSplitInfos(null).size)
+ assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 0959832949..186078c18d 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1801,7 +1801,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
- assertResult(conf._2)(plans.head.getSplitInfos(null).size)
+ assertResult(conf._2)(plans.head.getSplitInfos.size)
}
}
})
@@ -1910,7 +1910,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
- assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
+ assertResult(conf._2)(scanExec(1).getSplitInfos.size)
}
}
})
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 320d1f366c..061daaac0f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SerializableConfiguration,
SparkDirectoryUtil}
-
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
@@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String],
- serializableHadoopConf: SerializableConfiguration): SplitInfo = {
+ properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
@@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
modificationTimes,
partitionColumns,
metadataColumns) =
- constructSplitInfo(partitionSchema, f.files, metadataColumnNames,
serializableHadoopConf)
+ constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
@@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
- metadataColumnNames: Seq[String],
- serializableHadoopConf: SerializableConfiguration) = {
+ metadataColumnNames: Seq[String]) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
@@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded
path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by
using GlutenURLDecoder
- var filePath = file.filePath.toString
- if (filePath.startsWith("viewfs")) {
- val viewPath = new Path(filePath)
- val viewFileSystem = FileSystem.get(viewPath.toUri,
serializableHadoopConf.value)
- filePath = viewFileSystem.resolvePath(viewPath).toString
- }
paths.add(
GlutenURLDecoder
- .decode(filePath, StandardCharsets.UTF_8.name()))
+ .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
index 16cd18e41a..a5e638d9be 100755
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos7.scala
@@ -37,7 +37,7 @@ class SharedLibraryLoaderCentos7 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.10", "libre2.so", false)
loader.loadAndCreateLink("libzstd.so.1", "libzstd.so", false)
loader.loadAndCreateLink("liblz4.so.1", "liblz4.so", false)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
index 0a75c30c22..5d8c18b8bb 100755
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos8.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos8 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.0", "libre2.so", false)
loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
index 50f9fe4aaa..694cf4c622 100755
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderCentos9.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderCentos9 extends SharedLibraryLoader {
loader.loadAndCreateLink("libntlm.so.0", "libntlm.so", false)
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
loader.loadAndCreateLink("libsodium.so.23", "libsodium.so", false)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
index 06c065ba28..6927f2539e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian11.scala
@@ -44,6 +44,6 @@ class SharedLibraryLoaderDebian11 extends SharedLibraryLoader
{
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
index 8018995328..ce01f4399f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderDebian12.scala
@@ -50,6 +50,6 @@ class SharedLibraryLoaderDebian12 extends SharedLibraryLoader
{
loader.loadAndCreateLink("libevent-2.1.so.7", "libevent-2.1.so", false)
loader.loadAndCreateLink("libcurl.so.4", "libcurl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
index d1f21a0013..79c0518ea3 100755
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2004.scala
@@ -57,7 +57,7 @@ class SharedLibraryLoaderUbuntu2004 extends
SharedLibraryLoader {
loader.loadAndCreateLink("libicudata.so.66", "libicudata.so", false)
loader.loadAndCreateLink("libicuuc.so.66", "libicuuc.so", false)
loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.5", "libre2.so", false)
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libthrift-0.13.0.so", "libthrift.so", false)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
index 3cf4d30237..a5d99ede42 100755
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoaderUbuntu2204.scala
@@ -42,7 +42,7 @@ class SharedLibraryLoaderUbuntu2204 extends
SharedLibraryLoader {
loader.loadAndCreateLink("libgsasl.so.7", "libgsasl.so", false)
loader.loadAndCreateLink("libprotobuf.so.32", "libprotobuf.so", false)
loader.loadAndCreateLink("libxml2.so.2", "libxml2.so", false)
- loader.loadAndCreateLink("libhdfs.so.0.0.0", "libhdfs.so", false)
+ loader.loadAndCreateLink("libhdfs3.so.1", "libhdfs3.so", false)
loader.loadAndCreateLink("libre2.so.9", "libre2.so", false)
loader.loadAndCreateLink("libsnappy.so.1", "libsnappy.so", false)
loader.loadAndCreateLink("libthrift-0.16.0.so", "libthrift.so", false)
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 47e4ff8f13..7eaf5f1e61 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -109,6 +109,28 @@ macro(add_duckdb)
endif()
endmacro()
+macro(find_libhdfs3)
+ find_package(libhdfs3 CONFIG)
+ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
+ set(LIBHDFS3_LIBRARY HDFS::hdfs3)
+ else()
+ find_path(libhdfs3_INCLUDE_DIR hdfs/hdfs.h)
+ set(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
+ find_library(libhdfs3_LIBRARY NAMES hdfs3)
+ find_package_handle_standard_args(libhdfs3 DEFAULT_MSG libhdfs3_INCLUDE_DIR
+ libhdfs3_LIBRARY)
+ add_library(HDFS::hdfs3 SHARED IMPORTED)
+ set_target_properties(
+ HDFS::hdfs3
+ PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${libhdfs3_INCLUDE_DIR}"
+ IMPORTED_LOCATION "${libhdfs3_LIBRARY}")
+ endif()
+
+ if(NOT libhdfs3_FOUND)
+ message(FATAL_ERROR "LIBHDFS3 Library Not Found")
+ endif()
+endmacro()
+
macro(find_re2)
find_package(re2 CONFIG)
if(re2_FOUND AND TARGET re2::re2)
@@ -187,6 +209,10 @@ set(VELOX_SRCS
utils/Common.cc
utils/VeloxBatchResizer.cc)
+if(ENABLE_HDFS)
+ list(APPEND VELOX_SRCS utils/HdfsUtils.cc)
+endif()
+
if(ENABLE_S3)
find_package(ZLIB)
endif()
@@ -310,6 +336,8 @@ endif()
if(ENABLE_HDFS)
add_definitions(-DENABLE_HDFS)
+ find_libhdfs3()
+ target_link_libraries(velox PUBLIC HDFS::hdfs3)
endif()
if(ENABLE_S3)
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 0e1a9bed7b..ea5bd59794 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -22,6 +22,10 @@
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/exec/PlanNodeStats.h"
+#ifdef ENABLE_HDFS
+#include "utils/HdfsUtils.h"
+#endif
+
using namespace facebook;
namespace gluten {
@@ -66,6 +70,9 @@ WholeStageResultIterator::WholeStageResultIterator(
scanNodeIds_(scanNodeIds),
scanInfos_(scanInfos),
streamIds_(streamIds) {
+#ifdef ENABLE_HDFS
+ gluten::updateHdfsTokens(veloxCfg_.get());
+#endif
spillStrategy_ = veloxCfg_->get<std::string>(kSpillStrategy,
kSpillStrategyDefaultValue);
auto spillThreadNum = veloxCfg_->get<uint32_t>(kSpillThreadNum,
kSpillThreadNumDefaultValue);
if (spillThreadNum > 0) {
diff --git a/cpp/velox/utils/HdfsUtils.h b/cpp/velox/utils/HdfsUtils.h
new file mode 100644
index 0000000000..2e07d7ddf4
--- /dev/null
+++ b/cpp/velox/utils/HdfsUtils.h
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <velox/common/config/Config.h>
+#include <memory>
+namespace gluten {
+void updateHdfsTokens(const facebook::velox::config::ConfigBase* veloxCfg);
+}
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index 95811a8c26..87d425b5d0 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -16,8 +16,8 @@
set -exu
-VELOX_REPO=https://github.com/JkSelf/velox.git
-VELOX_BRANCH=libhdfs-test
+VELOX_REPO=https://github.com/oap-project/velox.git
+VELOX_BRANCH=2024_10_24
VELOX_HOME=""
OS=`uname -s`
diff --git a/ep/build-velox/src/modify_velox.patch
b/ep/build-velox/src/modify_velox.patch
index 3d45ff4b48..5c1aab2489 100644
--- a/ep/build-velox/src/modify_velox.patch
+++ b/ep/build-velox/src/modify_velox.patch
@@ -74,7 +74,27 @@ diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7f7cbc92f..52adb1250 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
-@@ -386,7 +391,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS
${BOOST_INCLUDE_LIBRARIES})
+@@ -242,10 +242,15 @@ if(VELOX_ENABLE_ABFS)
+ endif()
+
+ if(VELOX_ENABLE_HDFS)
+- find_library(
+- LIBHDFS3
+- NAMES libhdfs3.so libhdfs3.dylib
+- HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
++ find_package(libhdfs3)
++ if(libhdfs3_FOUND AND TARGET HDFS::hdfs3)
++ set(LIBHDFS3 HDFS::hdfs3)
++ else()
++ find_library(
++ LIBHDFS3
++ NAMES libhdfs3.so libhdfs3.dylib
++ HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
++ endif()
+ add_definitions(-DVELOX_ENABLE_HDFS3)
+ endif()
+
+@@ -385,7 +390,7 @@ resolve_dependency(Boost 1.77.0 COMPONENTS
${BOOST_INCLUDE_LIBRARIES})
# for reference. find_package(range-v3)
set_source(gflags)
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 10d24c317c..1cbeb52a92 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil
@@ -59,9 +58,7 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
- override def getSplitInfosFromPartitions(
- partitions: Seq[InputPartition],
- serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 5ebf8883c6..bb604f534f 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length ==
3)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
@@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length ==
3)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
@@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length ==
1)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
case _ => // do nothing
}
checkLengthAndPlan(df, 5)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 69c9d37334..11211bd0da 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
trait IteratorApi {
@@ -38,8 +37,7 @@ trait IteratorApi {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String],
- serializableHadoopConf: SerializableConfiguration): SplitInfo
+ properties: Map[String, String]): SplitInfo
/** Generate native row partition. */
def genPartitions(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index d7b824b397..912b93079f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField,
StructType}
-import org.apache.spark.util.SerializableConfiguration
import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
@@ -63,13 +62,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
def getProperties: Map[String, String] = Map.empty
/** Returns the split infos that will be processed by the underlying native
engine. */
- def getSplitInfos(serializableHadoopConf: SerializableConfiguration):
Seq[SplitInfo] = {
- getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
+ def getSplitInfos: Seq[SplitInfo] = {
+ getSplitInfosFromPartitions(getPartitions)
}
- def getSplitInfosFromPartitions(
- partitions: Seq[InputPartition],
- serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
+ def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
@@ -77,8 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
- getProperties,
- serializableHadoopConf))
+ getProperties))
}
override protected def doValidateInternal(): ValidationResult = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index e1dfd3f570..7bd84e09d1 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -40,7 +40,6 @@ import
org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
import com.google.common.collect.Lists
@@ -130,8 +129,6 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)
val sparkConf: SparkConf = sparkContext.getConf
- val serializableHadoopConf: SerializableConfiguration = new
SerializableConfiguration(
- sparkContext.hadoopConfiguration)
val numaBindingInfo: GlutenNumaBindingInfo =
GlutenConfig.getConf.numaBindingInfo
@transient
@@ -292,16 +289,12 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
- getSplitInfosFromPartitions(
- basicScanExecTransformers,
- allScanPartitions,
- serializableHadoopConf)
+ getSplitInfosFromPartitions(basicScanExecTransformers,
allScanPartitions)
val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
allScanSplitInfos,
basicScanExecTransformers)
-
val rdd = new GlutenWholeStageColumnarRDD(
sparkContext,
inputPartitions,
@@ -387,8 +380,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
private def getSplitInfosFromPartitions(
basicScanExecTransformers: Seq[BasicScanExecTransformer],
- allScanPartitions: Seq[Seq[InputPartition]],
- serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]]
= {
+ allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted. We should combine the two scan
// transformers' partitions with same index, and set them together in
@@ -406,8 +398,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
// p1n | p2n => substraitContext.setSplitInfo([p1n, p2n])
val allScanSplitInfos =
allScanPartitions.zip(basicScanExecTransformers).map {
- case (partition, transformer) =>
- transformer.getSplitInfosFromPartitions(partition,
serializableHadoopConf)
+ case (partition, transformer) =>
transformer.getSplitInfosFromPartitions(partition)
}
val partitionLength = allScanSplitInfos.head.size
if (allScanSplitInfos.exists(_.size != partitionLength)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]