This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 90428b9f6f [GLUTEN-7969][VL] Enable spill to multiple directories for
micro benchmark (#7970)
90428b9f6f is described below
commit 90428b9f6f55f13379df47a4480fca76f569c498
Author: Rong Ma <[email protected]>
AuthorDate: Tue Nov 19 21:30:47 2024 +0800
[GLUTEN-7969][VL] Enable spill to multiple directories for micro benchmark
(#7970)
---
cpp/core/shuffle/LocalPartitionWriter.cc | 5 +-
cpp/core/shuffle/LocalPartitionWriter.h | 1 -
cpp/core/shuffle/Utils.cc | 38 ++++---
cpp/core/shuffle/Utils.h | 10 +-
cpp/core/utils/StringUtil.cc | 16 ++-
cpp/core/utils/StringUtil.h | 2 +
cpp/velox/benchmarks/GenericBenchmark.cc | 137 +++++++++++++++++++++-----
cpp/velox/benchmarks/common/BenchmarkUtils.cc | 90 ++++-------------
cpp/velox/benchmarks/common/BenchmarkUtils.h | 9 +-
cpp/velox/tests/CMakeLists.txt | 2 +-
cpp/velox/tests/MyUdfTest.cc | 20 ++--
docs/developers/MicroBenchmarks.md | 18 ++--
12 files changed, 187 insertions(+), 161 deletions(-)
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index f0edfa2573..b7bfa19304 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -379,7 +379,7 @@ LocalPartitionWriter::LocalPartitionWriter(
}
std::string LocalPartitionWriter::nextSpilledFileDir() {
- auto spilledFileDir = getSpilledShuffleFileDir(localDirs_[dirSelection_],
subDirSelection_[dirSelection_]);
+ auto spilledFileDir = getShuffleSpillDir(localDirs_[dirSelection_],
subDirSelection_[dirSelection_]);
subDirSelection_[dirSelection_] = (subDirSelection_[dirSelection_] + 1) %
options_.numSubDirs;
dirSelection_ = (dirSelection_ + 1) % localDirs_.size();
return spilledFileDir;
@@ -505,6 +505,9 @@ arrow::Status
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
"Merging from spill " + std::to_string(s) + " is not exhausted.
pid: " + std::to_string(pid));
}
}
+ if (std::filesystem::exists(spill->spillFile()) &&
!std::filesystem::remove(spill->spillFile())) {
+ LOG(WARNING) << "Error while deleting spill file " << spill->spillFile();
+ }
++s;
}
spills_.clear();
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h
b/cpp/core/shuffle/LocalPartitionWriter.h
index 2b86a81e88..3ed0a13060 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -17,7 +17,6 @@
#pragma once
-#include <arrow/filesystem/localfs.h>
#include <arrow/io/api.h>
#include "shuffle/PartitionWriter.h"
diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc
index 6854c19783..a11b6b09aa 100644
--- a/cpp/core/shuffle/Utils.cc
+++ b/cpp/core/shuffle/Utils.cc
@@ -17,15 +17,15 @@
#include "shuffle/Utils.h"
#include <arrow/record_batch.h>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
#include <fcntl.h>
+#include <unistd.h>
#include <iomanip>
#include <iostream>
#include <numeric>
#include <sstream>
#include <thread>
#include "shuffle/Options.h"
+#include "utils/StringUtil.h"
#include "utils/Timer.h"
namespace gluten {
@@ -214,17 +214,10 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>>
makeUncompressedRecordBatch(
}
} // namespace gluten
-std::string gluten::generateUuid() {
- boost::uuids::random_generator generator;
- return boost::uuids::to_string(generator());
-}
-
-std::string gluten::getSpilledShuffleFileDir(const std::string& configuredDir,
int32_t subDirId) {
- auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
+std::string gluten::getShuffleSpillDir(const std::string& configuredDir,
int32_t subDirId) {
std::stringstream ss;
ss << std::setfill('0') << std::setw(2) << std::hex << subDirId;
- auto dir = arrow::fs::internal::ConcatAbstractPath(configuredDir, ss.str());
- return dir;
+ return std::filesystem::path(configuredDir) / ss.str();
}
arrow::Result<std::string> gluten::createTempShuffleFile(const std::string&
dir) {
@@ -232,22 +225,25 @@ arrow::Result<std::string>
gluten::createTempShuffleFile(const std::string& dir)
return arrow::Status::Invalid("Failed to create spilled file, got empty
path.");
}
- auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
- ARROW_ASSIGN_OR_RAISE(auto path_info, fs->GetFileInfo(dir));
- if (path_info.type() == arrow::fs::FileType::NotFound) {
- RETURN_NOT_OK(fs->CreateDir(dir, true));
+ if (std::filesystem::exists(dir)) {
+ if (!std::filesystem::is_directory(dir)) {
+ return arrow::Status::Invalid("Invalid directory. File path exists but
is not a directory: ", dir);
+ }
+ } else {
+ std::filesystem::create_directories(dir);
}
+ const auto parentPath = std::filesystem::path(dir);
bool exist = true;
- std::string filePath;
+ std::filesystem::path filePath;
while (exist) {
- filePath = arrow::fs::internal::ConcatAbstractPath(dir, "temp_shuffle_" +
generateUuid());
- ARROW_ASSIGN_OR_RAISE(auto file_info, fs->GetFileInfo(filePath));
- if (file_info.type() == arrow::fs::FileType::NotFound) {
- int fd = open(filePath.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
+ filePath = parentPath / ("temp-shuffle-" + generateUuid());
+ if (!std::filesystem::exists(filePath)) {
+ auto fd = open(filePath.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
if (fd < 0) {
if (errno != EEXIST) {
- return arrow::Status::IOError("Failed to open local file " +
filePath + ", Reason: " + strerror(errno));
+ return arrow::Status::IOError(
+ "Failed to open local file " + filePath.string() + ", Reason: "
+ strerror(errno));
}
} else {
exist = false;
diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h
index c4e2409d2d..64b9292d9d 100644
--- a/cpp/core/shuffle/Utils.h
+++ b/cpp/core/shuffle/Utils.h
@@ -18,13 +18,13 @@
#pragma once
#include <arrow/array.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/filesystem/localfs.h>
-#include <arrow/filesystem/path_util.h>
#include <arrow/ipc/writer.h>
#include <arrow/type.h>
#include <arrow/util/io_util.h>
+
#include <chrono>
+#include <filesystem>
+
#include "utils/Compression.h"
namespace gluten {
@@ -36,9 +36,7 @@ static const size_t kSizeOfBinaryArrayLengthBuffer =
sizeof(BinaryArrayLengthBuf
static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType);
static const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS";
-std::string generateUuid();
-
-std::string getSpilledShuffleFileDir(const std::string& configuredDir, int32_t
subDirId);
+std::string getShuffleSpillDir(const std::string& configuredDir, int32_t
subDirId);
arrow::Result<std::string> createTempShuffleFile(const std::string& dir);
diff --git a/cpp/core/utils/StringUtil.cc b/cpp/core/utils/StringUtil.cc
index fc6ebb83c0..505e1972f8 100644
--- a/cpp/core/utils/StringUtil.cc
+++ b/cpp/core/utils/StringUtil.cc
@@ -17,13 +17,16 @@
#include <filesystem>
#include <iostream>
-#include <string_view>
#include <vector>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+
#include "Exception.h"
#include "StringUtil.h"
-std::vector<std::string> gluten::splitByDelim(const std::string& s, const char
delimiter) {
+namespace gluten {
+std::vector<std::string> splitByDelim(const std::string& s, const char
delimiter) {
if (s.empty()) {
return {};
}
@@ -41,7 +44,7 @@ std::vector<std::string> gluten::splitByDelim(const
std::string& s, const char d
return result;
}
-std::vector<std::string> gluten::splitPaths(const std::string& s, bool
checkExists) {
+std::vector<std::string> splitPaths(const std::string& s, bool checkExists) {
if (s.empty()) {
return {};
}
@@ -61,3 +64,10 @@ std::vector<std::string> gluten::splitPaths(const
std::string& s, bool checkExis
}
return paths;
}
+
+std::string generateUuid() {
+ boost::uuids::random_generator generator;
+ return boost::uuids::to_string(generator());
+}
+
+} // namespace gluten
diff --git a/cpp/core/utils/StringUtil.h b/cpp/core/utils/StringUtil.h
index 8880229616..3030651aa3 100644
--- a/cpp/core/utils/StringUtil.h
+++ b/cpp/core/utils/StringUtil.h
@@ -23,4 +23,6 @@ std::vector<std::string> splitByDelim(const std::string& s,
const char delimiter
std::vector<std::string> splitPaths(const std::string& s, bool checkExists =
false);
+std::string generateUuid();
+
} // namespace gluten
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index e42aed9f21..4e38fb4432 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -106,6 +106,50 @@ void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
}
}
+std::string generateUniqueSubdir(const std::string& parent, const std::string&
prefix = "") {
+ auto path = std::filesystem::path(parent) / (prefix + generateUuid());
+ std::error_code ec{};
+ while (!std::filesystem::create_directories(path, ec)) {
+ if (ec) {
+ LOG(ERROR) << fmt::format("Failed to created spill directory: {}, error
code: {}", path, ec.message());
+ std::exit(EXIT_FAILURE);
+ }
+ path = std::filesystem::path(parent) / (prefix + generateUuid());
+ }
+ return path;
+}
+
+std::vector<std::string> createLocalDirs() {
+ static const std::string kBenchmarkDirPrefix = "generic-benchmark-";
+ std::vector<std::string> localDirs;
+
+ auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str());
+ // Check if local dirs are set from env.
+ if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) {
+ auto joinedDirs = std::string(joinedDirsC);
+ auto dirs = gluten::splitPaths(joinedDirs);
+ for (const auto& dir : dirs) {
+ localDirs.push_back(generateUniqueSubdir(dir, kBenchmarkDirPrefix));
+ }
+ } else {
+ // Otherwise create 1 temp dir.
+
localDirs.push_back(generateUniqueSubdir(std::filesystem::temp_directory_path(),
kBenchmarkDirPrefix));
+ }
+ return localDirs;
+}
+
+void cleanupLocalDirs(const std::vector<std::string>& localDirs) {
+ for (const auto& localDir : localDirs) {
+ std::error_code ec;
+ std::filesystem::remove_all(localDir, ec);
+ if (ec) {
+ LOG(WARNING) << fmt::format("Failed to remove directory: {}, error
message: {}", localDir, ec.message());
+ } else {
+ LOG(INFO) << "Removed local dir: " << localDir;
+ }
+ }
+}
+
PartitionWriterOptions createPartitionWriterOptions() {
PartitionWriterOptions partitionWriterOptions{};
// Disable writer's merge.
@@ -204,11 +248,10 @@ void runShuffle(
const std::shared_ptr<gluten::ResultIterator>& resultIter,
WriterMetrics& writerMetrics,
ReaderMetrics& readerMetrics,
- bool readAfterWrite) {
- std::string dataFile;
- std::vector<std::string> localDirs;
- bool isFromEnv;
- GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs,
isFromEnv));
+ bool readAfterWrite,
+ const std::vector<std::string>& localDirs,
+ const std::string& dataFileDir) {
+ GLUTEN_ASSIGN_OR_THROW(auto dataFile,
gluten::createTempShuffleFile(dataFileDir));
auto partitionWriterOptions = createPartitionWriterOptions();
auto partitionWriter = createPartitionWriter(runtime,
partitionWriterOptions, dataFile, localDirs);
@@ -252,8 +295,12 @@ void runShuffle(
readerMetrics.decompressTime = reader->getDecompressTime();
readerMetrics.deserializeTime = reader->getDeserializeTime();
}
- // Cleanup shuffle outputs
- cleanupShuffleOutput(dataFile, localDirs, isFromEnv);
+
+ if (std::filesystem::remove(dataFile)) {
+ LOG(INFO) << "Removed shuffle data file: " << dataFile;
+ } else {
+ LOG(WARNING) << "Failed to remove shuffle data file. File does not exist:
" << dataFile;
+ }
}
void updateBenchmarkMetrics(
@@ -292,7 +339,6 @@ void updateBenchmarkMetrics(
writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1024);
}
}
-
} // namespace
using RuntimeFactory = std::function<VeloxRuntime*(MemoryManager*
memoryManager)>;
@@ -301,6 +347,7 @@ auto BM_Generic = [](::benchmark::State& state,
const std::string& planFile,
const std::vector<std::string>& splitFiles,
const std::vector<std::string>& dataFiles,
+ const std::vector<std::string>& localDirs,
RuntimeFactory runtimeFactory,
FileReaderType readerType) {
setCpu(state);
@@ -316,6 +363,19 @@ auto BM_Generic = [](::benchmark::State& state,
splits.push_back(getPlanFromFile("ReadRel.LocalFiles", splitFile));
}
+ const auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id());
+ const auto spillDirIndex = tid % localDirs.size();
+ const auto veloxSpillDir =
generateUniqueSubdir(std::filesystem::path(localDirs[spillDirIndex]) /
"gluten-spill");
+
+ std::vector<std::string> shuffleSpillDirs;
+ std::transform(localDirs.begin(), localDirs.end(),
std::back_inserter(shuffleSpillDirs), [](const auto& dir) {
+ auto path = std::filesystem::path(dir) / "shuffle-write";
+ return path;
+ });
+ // Use a different directory for data file.
+ const auto dataFileDir = gluten::getShuffleSpillDir(
+ shuffleSpillDirs[(spillDirIndex + 1) % localDirs.size()],
state.thread_index() % gluten::kDefaultNumSubDirs);
+
WriterMetrics writerMetrics{};
ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
@@ -343,11 +403,13 @@ auto BM_Generic = [](::benchmark::State& state,
for (auto& split : splits) {
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()),
split.size(), std::nullopt);
}
- auto resultIter = runtime->createResultIterator("/tmp/test-spill",
std::move(inputIters), runtime->getConfMap());
+
+ auto resultIter = runtime->createResultIterator(veloxSpillDir,
std::move(inputIters), runtime->getConfMap());
listenerPtr->setIterator(resultIter.get());
if (FLAGS_with_shuffle) {
- runShuffle(runtime, listenerPtr, resultIter, writerMetrics,
readerMetrics, false);
+ runShuffle(
+ runtime, listenerPtr, resultIter, writerMetrics, readerMetrics,
false, shuffleSpillDirs, dataFileDir);
} else {
// May write the output into file.
auto veloxPlan =
dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
@@ -405,6 +467,7 @@ auto BM_Generic = [](::benchmark::State& state,
auto BM_ShuffleWriteRead = [](::benchmark::State& state,
const std::string& inputFile,
+ const std::vector<std::string>& localDirs,
RuntimeFactory runtimeFactory,
FileReaderType readerType) {
setCpu(state);
@@ -414,6 +477,10 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
auto* memoryManager = MemoryManager::create(kVeloxBackendKind,
std::move(listener));
auto runtime = runtimeFactory(memoryManager);
+ const size_t dirIndex =
std::hash<std::thread::id>{}(std::this_thread::get_id()) % localDirs.size();
+ const auto dataFileDir =
+ gluten::getShuffleSpillDir(localDirs[dirIndex], state.thread_index() %
gluten::kDefaultNumSubDirs);
+
WriterMetrics writerMetrics{};
ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
@@ -422,7 +489,15 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
ScopedTimer timer(&elapsedTime);
for (auto _ : state) {
auto resultIter = getInputIteratorFromFileReader(inputFile, readerType);
- runShuffle(runtime, listenerPtr, resultIter, writerMetrics,
readerMetrics, FLAGS_run_shuffle_read);
+ runShuffle(
+ runtime,
+ listenerPtr,
+ resultIter,
+ writerMetrics,
+ readerMetrics,
+ FLAGS_run_shuffle_read,
+ localDirs,
+ dataFileDir);
auto reader =
static_cast<FileReaderIterator*>(resultIter->getInputIter());
readInputTime += reader->getCollectBatchTime();
@@ -600,23 +675,31 @@ int main(int argc, char** argv) {
return dynamic_cast<VeloxRuntime*>(Runtime::create(kVeloxBackendKind,
memoryManager, sessionConf));
};
-#define GENERIC_BENCHMARK(READER_TYPE)
\
- do {
\
- auto* bm =
\
- ::benchmark::RegisterBenchmark(
\
- "GenericBenchmark", BM_Generic, substraitJsonFile, splitFiles,
dataFiles, runtimeFactory, READER_TYPE) \
- ->MeasureProcessCPUTime()
\
- ->UseRealTime();
\
- setUpBenchmark(bm);
\
+ const auto localDirs = createLocalDirs();
+
+#define GENERIC_BENCHMARK(READER_TYPE) \
+ do { \
+ auto* bm = ::benchmark::RegisterBenchmark( \
+ "GenericBenchmark", \
+ BM_Generic, \
+ substraitJsonFile, \
+ splitFiles, \
+ dataFiles, \
+ localDirs, \
+ runtimeFactory, \
+ READER_TYPE) \
+ ->MeasureProcessCPUTime() \
+ ->UseRealTime(); \
+ setUpBenchmark(bm); \
} while (0)
-#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE)
\
- do {
\
- auto* bm = ::benchmark::RegisterBenchmark(
\
- "ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0],
runtimeFactory, READER_TYPE) \
- ->MeasureProcessCPUTime()
\
- ->UseRealTime();
\
- setUpBenchmark(bm);
\
+#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE)
\
+ do {
\
+ auto* bm = ::benchmark::RegisterBenchmark(
\
+ "ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0],
localDirs, runtimeFactory, READER_TYPE) \
+ ->MeasureProcessCPUTime()
\
+ ->UseRealTime();
\
+ setUpBenchmark(bm);
\
} while (0)
if (dataFiles.empty()) {
@@ -642,5 +725,7 @@ int main(int argc, char** argv) {
gluten::VeloxBackend::get()->tearDown();
+ cleanupLocalDirs(localDirs);
+
return 0;
}
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index 345f9da8e1..d0a26ff07f 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -23,9 +23,6 @@
#include "utils/StringUtil.h"
#include "velox/dwio/common/Options.h"
-using namespace facebook;
-namespace fs = std::filesystem;
-
DEFINE_int64(batch_size, 4096, "To set
velox::core::QueryConfig::kPreferredOutputBatchSize.");
DEFINE_int32(cpu, -1, "Run benchmark on specific CPU");
DEFINE_int32(threads, 1, "The number of threads to run this benchmark");
@@ -34,7 +31,7 @@ DEFINE_int32(iterations, 1, "The number of iterations to run
this benchmark");
namespace gluten {
namespace {
std::unordered_map<std::string, std::string> bmConfMap = defaultConf();
-}
+} // namespace
std::unordered_map<std::string, std::string> defaultConf() {
return {
@@ -60,13 +57,13 @@ std::string getPlanFromFile(const std::string& type, const
std::string& filePath
return gluten::substraitFromJsonToPb(type, msgData);
}
-velox::dwio::common::FileFormat getFileFormat(const std::string& fileFormat) {
+facebook::velox::dwio::common::FileFormat getFileFormat(const std::string&
fileFormat) {
if (fileFormat.compare("orc") == 0) {
- return velox::dwio::common::FileFormat::ORC;
+ return facebook::velox::dwio::common::FileFormat::ORC;
} else if (fileFormat.compare("parquet") == 0) {
- return velox::dwio::common::FileFormat::PARQUET;
+ return facebook::velox::dwio::common::FileFormat::PARQUET;
} else {
- return velox::dwio::common::FileFormat::UNKNOWN;
+ return facebook::velox::dwio::common::FileFormat::UNKNOWN;
}
}
@@ -84,7 +81,7 @@ std::shared_ptr<gluten::SplitInfo> getSplitInfos(const
std::string& datasetPath,
if (endsWith(singleFilePath, "." + fileFormat)) {
auto fileAbsolutePath = datasetPath + singleFilePath;
scanInfo->starts.emplace_back(0);
- scanInfo->lengths.emplace_back(fs::file_size(fileAbsolutePath));
+
scanInfo->lengths.emplace_back(std::filesystem::file_size(fileAbsolutePath));
scanInfo->paths.emplace_back("file://" + fileAbsolutePath);
}
} else {
@@ -102,7 +99,7 @@ std::shared_ptr<gluten::SplitInfo>
getSplitInfosFromFile(const std::string& file
// Set split start, length, and path to scan info.
scanInfo->starts.emplace_back(0);
- scanInfo->lengths.emplace_back(fs::file_size(fileName));
+ scanInfo->lengths.emplace_back(std::filesystem::file_size(fileName));
scanInfo->paths.emplace_back("file://" + fileName);
return scanInfo;
@@ -125,78 +122,25 @@ bool endsWith(const std::string& data, const std::string&
suffix) {
return data.find(suffix, data.size() - suffix.size()) != std::string::npos;
}
-#if 0
-std::shared_ptr<arrow::RecordBatchReader> createReader(const std::string&
path) {
- std::unique_ptr<parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<arrow::RecordBatchReader> recordBatchReader;
- parquet::ArrowReaderProperties properties =
parquet::default_arrow_reader_properties();
-
- GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
parquet::ParquetFileReader::OpenFile(path), properties, &parquetReader));
- GLUTEN_THROW_NOT_OK(
-
parquetReader->GetRecordBatchReader(arrow::internal::Iota(parquetReader->num_row_groups()),
&recordBatchReader));
- return recordBatchReader;
-}
-#endif
-
-void setCpu(uint32_t cpuindex) {
+void setCpu(uint32_t cpuIndex) {
static const auto kTotalCores = std::thread::hardware_concurrency();
- cpuindex = cpuindex % kTotalCores;
+ cpuIndex = cpuIndex % kTotalCores;
cpu_set_t cs;
CPU_ZERO(&cs);
- CPU_SET(cpuindex, &cs);
+ CPU_SET(cpuIndex, &cs);
if (sched_setaffinity(0, sizeof(cs), &cs) == -1) {
- LOG(WARNING) << "Error binding CPU " << std::to_string(cpuindex);
- exit(EXIT_FAILURE);
- }
-}
-
-arrow::Status
-setLocalDirsAndDataFileFromEnv(std::string& dataFile,
std::vector<std::string>& localDirs, bool& isFromEnv) {
- auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str());
- if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) {
- isFromEnv = true;
- // Set local dirs.
- auto joinedDirs = std::string(joinedDirsC);
- // Split local dirs and use thread id to choose one directory for data
file.
- auto dirs = gluten::splitPaths(joinedDirs);
- for (const auto& dir : dirs) {
- localDirs.push_back(arrow::fs::internal::ConcatAbstractPath(dir,
"temp_shuffle_" + generateUuid()));
- std::filesystem::create_directory(localDirs.back());
- }
- size_t id = std::hash<std::thread::id>{}(std::this_thread::get_id()) %
localDirs.size();
- ARROW_ASSIGN_OR_RAISE(dataFile,
gluten::createTempShuffleFile(localDirs[id]));
- } else {
- isFromEnv = false;
- // Otherwise create 1 temp dir and data file.
- static const std::string kBenchmarkDirsPrefix =
"columnar-shuffle-benchmark-";
- {
- // Because tmpDir will be deleted in the dtor, allow it to be deleted
upon exiting the block and then recreate it
- // in createTempShuffleFile.
- ARROW_ASSIGN_OR_RAISE(auto tmpDir,
arrow::internal::TemporaryDir::Make(kBenchmarkDirsPrefix))
- localDirs.push_back(tmpDir->path().ToString());
- }
- ARROW_ASSIGN_OR_RAISE(dataFile,
gluten::createTempShuffleFile(localDirs.back()));
- }
- return arrow::Status::OK();
-}
-
-void cleanupShuffleOutput(const std::string& dataFile, const
std::vector<std::string>& localDirs, bool isFromEnv) {
- std::filesystem::remove(dataFile);
- for (auto& localDir : localDirs) {
- if (std::filesystem::is_empty(localDir)) {
- std::filesystem::remove(localDir);
- }
+ LOG(WARNING) << "Error binding CPU " << std::to_string(cpuIndex);
+ std::exit(EXIT_FAILURE);
}
}
void BenchmarkAllocationListener::allocationChanged(int64_t diff) {
- if (usedBytes_ + diff >= limit_) {
+ if (diff > 0 && usedBytes_ + diff >= limit_) {
LOG(INFO) << fmt::format(
"reach hard limit {} when need {}, current used {}.",
- velox::succinctBytes(limit_),
- velox::succinctBytes(diff),
- velox::succinctBytes(usedBytes_));
+ facebook::velox::succinctBytes(limit_),
+ facebook::velox::succinctBytes(diff),
+ facebook::velox::succinctBytes(usedBytes_));
auto neededBytes = usedBytes_ + diff - limit_;
int64_t spilledBytes = 0;
if (iterator_) {
@@ -207,7 +151,7 @@ void BenchmarkAllocationListener::allocationChanged(int64_t
diff) {
GLUTEN_THROW_NOT_OK(shuffleWriter_->reclaimFixedSize(neededBytes -
spilledBytes, &reclaimed));
spilledBytes += reclaimed;
}
- LOG(INFO) << fmt::format("spill finish, got {}.",
velox::succinctBytes(spilledBytes));
+ LOG(INFO) << fmt::format("spill finish, got {}.",
facebook::velox::succinctBytes(spilledBytes));
} else {
usedBytes_ += diff;
}
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h
b/cpp/velox/benchmarks/common/BenchmarkUtils.h
index 0108f1d448..de3df96f89 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.h
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h
@@ -95,12 +95,7 @@ inline std::shared_ptr<gluten::ColumnarBatch>
convertBatch(std::shared_ptr<glute
/// Return whether the data ends with suffix.
bool endsWith(const std::string& data, const std::string& suffix);
-void setCpu(uint32_t cpuindex);
-
-arrow::Status
-setLocalDirsAndDataFileFromEnv(std::string& dataFile,
std::vector<std::string>& localDirs, bool& isFromEnv);
-
-void cleanupShuffleOutput(const std::string& dataFile, const
std::vector<std::string>& localDirs, bool isFromEnv);
+void setCpu(uint32_t cpuIndex);
class BenchmarkAllocationListener final : public gluten::AllocationListener {
public:
@@ -118,7 +113,7 @@ class BenchmarkAllocationListener final : public
gluten::AllocationListener {
private:
uint64_t usedBytes_{0L};
- uint64_t limit_{0L};
+ const uint64_t limit_{0L};
gluten::ResultIterator* iterator_{nullptr};
gluten::ShuffleWriter* shuffleWriter_{nullptr};
};
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 55cab34266..ba90e45076 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -60,5 +60,5 @@ add_velox_test(runtime_test SOURCES RuntimeTest.cc)
add_velox_test(velox_memory_test SOURCES MemoryManagerTest.cc)
add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc)
if(BUILD_EXAMPLES)
- add_velox_test(MyUdfTest SOURCES MyUdfTest.cc)
+ add_velox_test(my_udf_test SOURCES MyUdfTest.cc)
endif()
diff --git a/cpp/velox/tests/MyUdfTest.cc b/cpp/velox/tests/MyUdfTest.cc
index c9849d67d0..8e4f1e7dec 100644
--- a/cpp/velox/tests/MyUdfTest.cc
+++ b/cpp/velox/tests/MyUdfTest.cc
@@ -17,12 +17,13 @@
#include <vector>
#include "udf/UdfLoader.h"
-#include "velox/expression/VectorFunction.h"
+#include "velox/expression/SimpleFunctionRegistry.h"
#include "velox/functions/prestosql/tests/utils/FunctionBaseTest.h"
#include "velox/parse/TypeResolver.h"
using namespace facebook::velox::functions::test;
using namespace facebook::velox;
+
class MyUdfTest : public FunctionBaseTest {
protected:
static void SetUpTestCase() {
@@ -35,16 +36,7 @@ class MyUdfTest : public FunctionBaseTest {
};
TEST_F(MyUdfTest, hivestringstring) {
- auto map = facebook::velox::exec::vectorFunctionFactories();
- const std::string candidate =
{"org.apache.spark.sql.hive.execution.UDFStringString"};
- ASSERT(map.withRLock([&candidate](auto& self) -> bool {
- auto iter = self.find(candidate);
- std::unordered_map<std::string, std::string> values;
- const facebook::velox::core::QueryConfig config(std::move(values));
- iter->second.factory(
- candidate,
- {facebook::velox::exec::VectorFunctionArg{facebook::velox::VARCHAR()},
- facebook::velox::exec::VectorFunctionArg{facebook::velox::VARCHAR()}},
- config) != nullptr;
- });)
-}
\ No newline at end of file
+ const std::string name =
"org.apache.spark.sql.hive.execution.UDFStringString";
+ const core::QueryConfig config({});
+ EXPECT_EQ(TypeKind::VARCHAR, exec::simpleFunctions().resolveFunction(name,
{VARCHAR(), VARCHAR()})->type()->kind());
+}
diff --git a/docs/developers/MicroBenchmarks.md
b/docs/developers/MicroBenchmarks.md
index bc463fcac9..eedf5010b6 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -257,7 +257,7 @@ inputs from a first stage. The steps are demonstrated as
below:
1. Start spark-shell or pyspark
We need to set `spark.gluten.sql.benchmark_task.stageId` and
`spark.gluten.saveDir` to dump the inputs.
-Normally, the stage id should be greater than 0. You can run the command in
step 2 in advance to get the
+Normally, the stage id should be greater than 0. You can run the command in
step 2 in advance to get the
right stage id in your case. We shall set `spark.default.parallelism` to 1 and
`spark.sql.files.maxPartitionBytes`
large enough to make sure there will be only 1 task in the first stage.
@@ -378,17 +378,18 @@ cd /path/to/gluten/cpp/build/velox/benchmarks
--write-path /absolute_path/<dir>
```
-
## Simulate task spilling
-You can simulate task spilling by specify memory hard limit from
`--memory_limit`.
+You can simulate task spilling by specify a memory hard limit from
`--memory_limit`. By default, spilled files are written to the `/tmp` directory.
+To simulate real Gluten workloads, which utilize multiple spill directories,
set the environment variable GLUTEN_SPARK_LOCAL_DIRS to a comma-separated
string.
+Please check [Simulate Gluten workload with multiple processes and
threads](#Simulate-Gluten-workload-with-multiple-processes-and-threads) for
more details.
-## Simulate Spark with multiple processes and threads
+## Simulate Gluten workload with multiple processes and threads
You can use below command to launch several processes and threads to simulate
parallel execution on
Spark. Each thread in the same process will be pinned to the core number
starting from `--cpu`.
-Suppose running on a baremetal machine with 48C, 2-socket, HT-on, launching
below command will
+Suppose running on a bare-metal machine with 48C, 2-socket, HT-on, launching
below command will
utilize all vcores.
```shell
@@ -400,9 +401,10 @@ for ((i=0; i<${processes}; i++)); do
done
```
-If you want to add the shuffle write process, you can specify multiple
directories by setting
-environment variable `GLUTEN_SPARK_LOCAL_DIRS` to a comma-separated string for
shuffle write to
-spread the I/O pressure to multiple disks.
+To include the shuffle write process or trigger spilling via `--memory-limit`,
+you can specify multiple directories by setting the `GLUTEN_SPARK_LOCAL_DIRS`
environment variable
+to a comma-separated string. This will distribute the I/O load across multiple
disks, similar to how it works for Gluten workloads.
+Temporary subdirectories will be created under each specified directory at
runtime and will be automatically deleted if the process completes normally.
```shell
mkdir -p {/data1,/data2,/data3}/tmp # Make sure each directory has been
already created.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]