This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d746f0f72 [VL] Add reader process to shuffle benchmark (#6682)
d746f0f72 is described below
commit d746f0f7296a0c84fb046cca77d605f4a927c6f7
Author: Rong Ma <[email protected]>
AuthorDate: Tue Aug 6 15:38:01 2024 +0800
[VL] Add reader process to shuffle benchmark (#6682)
---
cpp/core/shuffle/LocalPartitionWriter.cc | 18 +-
cpp/core/shuffle/Payload.cc | 1 -
cpp/core/shuffle/Payload.h | 1 -
cpp/velox/benchmarks/GenericBenchmark.cc | 223 ++++++++++++++-------
cpp/velox/benchmarks/common/BenchmarkUtils.cc | 6 +-
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 2 +-
cpp/velox/shuffle/VeloxShuffleReader.cc | 7 +-
docs/developers/MicroBenchmarks.md | 25 ++-
.../execution/ColumnarShuffleExchangeExec.scala | 6 +-
9 files changed, 198 insertions(+), 91 deletions(-)
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index fc5d758f8..031be791b 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -609,11 +609,23 @@ arrow::Status LocalPartitionWriter::evict(uint32_t
partitionId, std::unique_ptr<
if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill(true));
+ lastEvictPid_ = -1;
}
- lastEvictPid_ = partitionId;
-
RETURN_NOT_OK(requestSpill(stop));
- RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
+
+ if (!stop) {
+ RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
+ } else {
+ if (spills_.size() > 0) {
+ for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
+ auto bytesEvicted = totalBytesEvicted_;
+ RETURN_NOT_OK(mergeSpills(pid));
+ partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+ }
+ }
+ RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
+ }
+ lastEvictPid_ = partitionId;
return arrow::Status::OK();
}
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index daeef24ce..b8d8274cb 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -293,7 +293,6 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
BlockPayload::readBufferAt(uint32_
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
BlockPayload::deserialize(
arrow::io::InputStream* inputStream,
- const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
arrow::MemoryPool* pool,
uint32_t& numRows,
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 4c53065a6..0a317d9c3 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -88,7 +88,6 @@ class BlockPayload final : public Payload {
static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
deserialize(
arrow::io::InputStream* inputStream,
- const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
arrow::MemoryPool* pool,
uint32_t& numRows,
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index a7776f9f1..4b46dbd59 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -30,6 +30,7 @@
#include "compute/VeloxPlanConverter.h"
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
+#include "config/VeloxConfig.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
@@ -44,22 +45,23 @@ using namespace gluten;
namespace {
+DEFINE_bool(run_example, false, "Run the example and exit.");
DEFINE_bool(print_result, true, "Print result for execution");
DEFINE_string(save_output, "", "Path to parquet file for saving the task
output iterator");
DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
+DEFINE_bool(run_shuffle, false, "Only run shuffle write.");
+DEFINE_bool(run_shuffle_read, false, "Whether to run shuffle read when
run_shuffle is true.");
+DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or
sort");
DEFINE_string(
partitioning,
"rr",
"Short partitioning name. Valid options are rr, hash, range, single,
random (only for test purpose)");
-DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or
sort");
DEFINE_bool(rss, false, "Mocking rss.");
DEFINE_string(
compression,
"lz4",
"Specify the compression codec. Valid options are lz4, zstd, qat_gzip,
qat_zstd, iaa_gzip");
DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer)
partitions");
-DEFINE_bool(run_shuffle, false, "Only run shuffle write.");
-DEFINE_bool(run_example, false, "Run the example and exit.");
DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
DEFINE_string(
@@ -76,15 +78,21 @@ DEFINE_string(
"Scan mode for reading parquet data."
"'stream' mode: Input file scan happens inside of the pipeline."
"'buffered' mode: First read all data into memory and feed the pipeline
with it.");
+DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting
`spark.gluten.sql.debug`");
struct WriterMetrics {
- int64_t splitTime;
- int64_t evictTime;
- int64_t writeTime;
- int64_t compressTime;
+ int64_t splitTime{0};
+ int64_t evictTime{0};
+ int64_t writeTime{0};
+ int64_t compressTime{0};
+
+ int64_t bytesSpilled{0};
+ int64_t bytesWritten{0};
+};
- public:
- explicit WriterMetrics() : splitTime(0), evictTime(0), writeTime(0),
compressTime(0) {}
+struct ReaderMetrics {
+ int64_t decompressTime{0};
+ int64_t deserializeTime{0};
};
void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
@@ -98,9 +106,10 @@ void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
}
}
-std::shared_ptr<VeloxShuffleWriter>
-createShuffleWriter(Runtime* runtime, const std::string& dataFile, const
std::vector<std::string>& localDirs) {
+PartitionWriterOptions createPartitionWriterOptions() {
PartitionWriterOptions partitionWriterOptions{};
+ // Disable writer's merge.
+ partitionWriterOptions.mergeThreshold = 0;
// Configure compression.
if (FLAGS_compression == "lz4") {
@@ -121,27 +130,39 @@ createShuffleWriter(Runtime* runtime, const std::string&
dataFile, const std::ve
partitionWriterOptions.codecBackend = CodecBackend::IAA;
partitionWriterOptions.compressionType = arrow::Compression::GZIP;
}
+ return partitionWriterOptions;
+}
+std::unique_ptr<PartitionWriter> createPartitionWriter(
+ Runtime* runtime,
+ PartitionWriterOptions options,
+ const std::string& dataFile,
+ const std::vector<std::string>& localDirs) {
std::unique_ptr<PartitionWriter> partitionWriter;
if (FLAGS_rss) {
auto rssClient = std::make_unique<LocalRssClient>(dataFile);
partitionWriter = std::make_unique<RssPartitionWriter>(
FLAGS_shuffle_partitions,
- std::move(partitionWriterOptions),
+ std::move(options),
runtime->memoryManager()->getArrowMemoryPool(),
std::move(rssClient));
} else {
partitionWriter = std::make_unique<LocalPartitionWriter>(
FLAGS_shuffle_partitions,
- std::move(partitionWriterOptions),
+ std::move(options),
runtime->memoryManager()->getArrowMemoryPool(),
dataFile,
localDirs);
}
+ return partitionWriter;
+}
+std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
+ Runtime* runtime,
+ std::unique_ptr<PartitionWriter> partitionWriter) {
auto options = ShuffleWriterOptions{};
options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
- if (FLAGS_rss) {
+ if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") {
options.shuffleWriterType = gluten::kRssSortShuffle;
} else if (FLAGS_shuffle_writer == "sort") {
options.shuffleWriterType = gluten::kSortShuffle;
@@ -163,6 +184,8 @@ void populateWriterMetrics(
if (splitTime > 0) {
metrics.splitTime += splitTime;
}
+ metrics.bytesWritten += shuffleWriter->totalBytesWritten();
+ metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
}
void setCpu(::benchmark::State& state) {
@@ -171,7 +194,7 @@ void setCpu(::benchmark::State& state) {
if (FLAGS_cpu != -1) {
cpu += FLAGS_cpu;
}
- LOG(INFO) << "Setting CPU for thread " << state.thread_index() << " to " <<
cpu;
+ LOG(WARNING) << "Setting CPU for thread " << state.thread_index() << " to "
<< cpu;
gluten::setCpu(cpu);
}
@@ -179,26 +202,56 @@ void runShuffle(
Runtime* runtime,
BenchmarkAllocationListener* listener,
const std::shared_ptr<gluten::ResultIterator>& resultIter,
- WriterMetrics& metrics) {
+ WriterMetrics& writerMetrics,
+ ReaderMetrics& readerMetrics,
+ bool readAfterWrite) {
std::string dataFile;
std::vector<std::string> localDirs;
bool isFromEnv;
GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs,
isFromEnv));
- auto shuffleWriter = createShuffleWriter(runtime, dataFile, localDirs);
+ auto partitionWriterOptions = createPartitionWriterOptions();
+ auto partitionWriter = createPartitionWriter(runtime,
partitionWriterOptions, dataFile, localDirs);
+ auto shuffleWriter = createShuffleWriter(runtime,
std::move(partitionWriter));
listener->setShuffleWriter(shuffleWriter.get());
int64_t totalTime = 0;
+ std::shared_ptr<ArrowSchema> cSchema;
{
gluten::ScopedTimer timer(&totalTime);
while (resultIter->hasNext()) {
- GLUTEN_THROW_NOT_OK(
- shuffleWriter->write(resultIter->next(), ShuffleWriter::kMaxMemLimit
- shuffleWriter->cachedPayloadSize()));
+ auto cb = resultIter->next();
+ if (!cSchema) {
+ cSchema = cb->exportArrowSchema();
+ }
+ GLUTEN_THROW_NOT_OK(shuffleWriter->write(cb, ShuffleWriter::kMaxMemLimit
- shuffleWriter->cachedPayloadSize()));
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
}
- populateWriterMetrics(shuffleWriter, totalTime, metrics);
+ populateWriterMetrics(shuffleWriter, totalTime, writerMetrics);
+
+ if (readAfterWrite && cSchema) {
+ auto readerOptions = ShuffleReaderOptions{};
+ readerOptions.shuffleWriterType =
shuffleWriter->options().shuffleWriterType;
+ readerOptions.compressionType = partitionWriterOptions.compressionType;
+ readerOptions.codecBackend = partitionWriterOptions.codecBackend;
+ readerOptions.compressionTypeStr =
partitionWriterOptions.compressionTypeStr;
+
+ std::shared_ptr<arrow::Schema> schema =
+ gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct
ArrowSchema*>(cSchema.get())));
+ auto reader = runtime->createShuffleReader(schema, readerOptions);
+
+ GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::ReadableFile::Open(dataFile));
+ // Read all partitions.
+ auto iter = reader->readStream(in);
+ while (iter->hasNext()) {
+ // Read and discard.
+ auto cb = iter->next();
+ }
+ readerMetrics.decompressTime = reader->getDecompressTime();
+ readerMetrics.deserializeTime = reader->getDeserializeTime();
+ }
// Cleanup shuffle outputs
cleanupShuffleOutput(dataFile, localDirs, isFromEnv);
}
@@ -207,20 +260,37 @@ void updateBenchmarkMetrics(
::benchmark::State& state,
const int64_t& elapsedTime,
const int64_t& readInputTime,
- const WriterMetrics& writerMetrics) {
+ const WriterMetrics& writerMetrics,
+ const ReaderMetrics& readerMetrics) {
state.counters["read_input_time"] =
benchmark::Counter(readInputTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
state.counters["elapsed_time"] =
benchmark::Counter(elapsedTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
- state.counters["shuffle_write_time"] = benchmark::Counter(
- writerMetrics.writeTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
- state.counters["shuffle_spill_time"] = benchmark::Counter(
- writerMetrics.evictTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
- state.counters["shuffle_split_time"] = benchmark::Counter(
- writerMetrics.splitTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
- state.counters["shuffle_compress_time"] = benchmark::Counter(
- writerMetrics.compressTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+ if (FLAGS_run_shuffle || FLAGS_with_shuffle) {
+ state.counters["shuffle_write_time"] = benchmark::Counter(
+ writerMetrics.writeTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+ state.counters["shuffle_spill_time"] = benchmark::Counter(
+ writerMetrics.evictTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+ state.counters["shuffle_compress_time"] = benchmark::Counter(
+ writerMetrics.compressTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+ state.counters["shuffle_decompress_time"] = benchmark::Counter(
+ readerMetrics.decompressTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+ state.counters["shuffle_deserialize_time"] = benchmark::Counter(
+ readerMetrics.deserializeTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+
+ auto splitTime = writerMetrics.splitTime;
+ if (FLAGS_scan_mode == "stream") {
+ splitTime -= readInputTime;
+ }
+ state.counters["shuffle_split_time"] =
+ benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1000);
+
+ state.counters["shuffle_spilled_bytes"] = benchmark::Counter(
+ writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1024);
+ state.counters["shuffle_write_bytes"] = benchmark::Counter(
+ writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations,
benchmark::Counter::OneK::kIs1024);
+ }
}
} // namespace
@@ -246,6 +316,7 @@ auto BM_Generic = [](::benchmark::State& state,
}
WriterMetrics writerMetrics{};
+ ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
int64_t elapsedTime = 0;
@@ -275,7 +346,7 @@ auto BM_Generic = [](::benchmark::State& state,
listenerPtr->setIterator(resultIter.get());
if (FLAGS_with_shuffle) {
- runShuffle(runtime, listenerPtr, resultIter, writerMetrics);
+ runShuffle(runtime, listenerPtr, resultIter, writerMetrics,
readerMetrics, false);
} else {
// May write the output into file.
auto veloxPlan =
dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
@@ -299,7 +370,7 @@ auto BM_Generic = [](::benchmark::State& state,
return;
}
if (FLAGS_print_result) {
- LOG(INFO) << maybeBatch.ValueOrDie()->ToString();
+ LOG(WARNING) << maybeBatch.ValueOrDie()->ToString();
}
if (!FLAGS_save_output.empty()) {
GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie()));
@@ -322,18 +393,18 @@ auto BM_Generic = [](::benchmark::State& state,
const auto* task = rawIter->task();
const auto* planNode = rawIter->veloxPlan();
auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode,
task->taskStats(), true);
- LOG(INFO) << statsStr;
+ LOG(WARNING) << statsStr;
}
}
- updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics);
+ updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics,
readerMetrics);
Runtime::release(runtime);
};
-auto BM_ShuffleWrite = [](::benchmark::State& state,
- const std::string& inputFile,
- RuntimeFactory runtimeFactory,
- FileReaderType readerType) {
+auto BM_ShuffleWriteRead = [](::benchmark::State& state,
+ const std::string& inputFile,
+ RuntimeFactory runtimeFactory,
+ FileReaderType readerType) {
setCpu(state);
auto listener =
std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit);
@@ -341,31 +412,48 @@ auto BM_ShuffleWrite = [](::benchmark::State& state,
auto runtime = runtimeFactory(std::move(listener));
WriterMetrics writerMetrics{};
+ ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
int64_t elapsedTime = 0;
{
ScopedTimer timer(&elapsedTime);
for (auto _ : state) {
auto resultIter = getInputIteratorFromFileReader(inputFile, readerType);
- runShuffle(runtime, listenerPtr, resultIter, writerMetrics);
+ runShuffle(runtime, listenerPtr, resultIter, writerMetrics,
readerMetrics, FLAGS_run_shuffle_read);
auto reader =
static_cast<FileReaderIterator*>(resultIter->getInputIter());
readInputTime += reader->getCollectBatchTime();
}
}
- updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics);
+ updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics,
readerMetrics);
Runtime::release(runtime);
};
int main(int argc, char** argv) {
- ::benchmark::Initialize(&argc, argv);
gflags::ParseCommandLineFlags(&argc, &argv, true);
+ std::ostringstream ss;
+ ss << "Setting flags from command line args: " << std::endl;
+ std::vector<google::CommandLineFlagInfo> flags;
+ google::GetAllFlags(&flags);
+ auto filename = std::filesystem::path(__FILE__).filename();
+ for (const auto& flag : flags) {
+ if (std::filesystem::path(flag.filename).filename() == filename) {
+ ss << " FLAGS_" << flag.name << ": default = " << flag.default_value
<< ", current = " << flag.current_value
+ << std::endl;
+ }
+ }
+ LOG(WARNING) << ss.str();
+
+ ::benchmark::Initialize(&argc, argv);
+
// Init Velox backend.
auto backendConf = gluten::defaultConf();
auto sessionConf = gluten::defaultConf();
- backendConf.insert({gluten::kSparkBatchSize,
std::to_string(FLAGS_batch_size)});
+ backendConf.insert({gluten::kDebugModeEnabled,
std::to_string(FLAGS_debug_mode)});
+ backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)});
+ backendConf.insert({gluten::kGlogSeverityLevel,
std::to_string(FLAGS_minloglevel)});
if (!FLAGS_conf.empty()) {
abortIfFileNotExists(FLAGS_conf);
std::ifstream file(FLAGS_conf);
@@ -425,7 +513,7 @@ int main(int argc, char** argv) {
std::vector<std::string> dataFiles{};
if (FLAGS_run_example) {
- LOG(INFO) << "Running example...";
+ LOG(WARNING) << "Running example...";
dataFiles.resize(2);
try {
substraitJsonFile = getGeneratedFilePath("example.json");
@@ -484,33 +572,23 @@ int main(int argc, char** argv) {
if (!errorMsg.empty()) {
LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl
- << "If simulating a first stage, the usage is:" << std::endl
- << "./generic_benchmark "
- << "--plan /absolute-path/to/substrait_json_file "
- << "--split
/absolute-path/to/split_json_file_1,/abosolute-path/to/split_json_file_2,..."
- << "--data
/absolute-path/to/data_file_1,/absolute-path/to/data_file_2,..." << std::endl
- << "If simulating a middle stage, the usage is:" << std::endl
- << "./generic_benchmark "
- << "--plan /absolute-path/to/substrait_json_file "
- << "--data
/absolute-path/to/data_file_1,/absolute-path/to/data_file_2,...";
- LOG(ERROR) << "*** Please check docs/developers/MicroBenchmarks.md for
the full usage. ***";
+ << "*** Please check docs/developers/MicroBenchmarks.md for
the full usage. ***";
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
}
- // Check whether input files exist.
- LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile;
+ LOG(WARNING) << "Using substrait json file: " << std::endl <<
substraitJsonFile;
if (!splitFiles.empty()) {
- LOG(INFO) << "Using " << splitFiles.size() << " input split file(s): ";
+ LOG(WARNING) << "Using " << splitFiles.size() << " input split file(s): ";
for (const auto& splitFile : splitFiles) {
- LOG(INFO) << splitFile;
+ LOG(WARNING) << splitFile;
}
}
if (!dataFiles.empty()) {
- LOG(INFO) << "Using " << dataFiles.size() << " input data file(s): ";
+ LOG(WARNING) << "Using " << dataFiles.size() << " input data file(s): ";
for (const auto& dataFile : dataFiles) {
- LOG(INFO) << dataFile;
+ LOG(WARNING) << dataFile;
}
}
@@ -528,37 +606,28 @@ int main(int argc, char** argv) {
setUpBenchmark(bm);
\
} while (0)
-#define SHUFFLE_WRITE_BENCHMARK(READER_TYPE)
\
- do {
\
- auto* bm =
\
- ::benchmark::RegisterBenchmark("ShuffleWrite", BM_ShuffleWrite,
dataFiles[0], runtimeFactory, READER_TYPE) \
- ->MeasureProcessCPUTime()
\
- ->UseRealTime();
\
- setUpBenchmark(bm);
\
+#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE)
\
+ do {
\
+ auto* bm = ::benchmark::RegisterBenchmark(
\
+ "ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0],
runtimeFactory, READER_TYPE) \
+ ->MeasureProcessCPUTime()
\
+ ->UseRealTime();
\
+ setUpBenchmark(bm);
\
} while (0)
- LOG(INFO) << "Using options: ";
- LOG(INFO) << "threads: " << FLAGS_threads;
- LOG(INFO) << "iterations: " << FLAGS_iterations;
- LOG(INFO) << "cpu: " << FLAGS_cpu;
- LOG(INFO) << "print_result: " << FLAGS_print_result;
- LOG(INFO) << "save_output: " << FLAGS_save_output;
- LOG(INFO) << "batch_size: " << FLAGS_batch_size;
- LOG(INFO) << "write_path: " << FLAGS_write_path;
-
if (dataFiles.empty()) {
GENERIC_BENCHMARK(FileReaderType::kNone);
} else {
FileReaderType readerType;
if (FLAGS_scan_mode == "buffered") {
readerType = FileReaderType::kBuffered;
- LOG(INFO) << "Using buffered mode for reading parquet data.";
+ LOG(WARNING) << "Using buffered mode for reading parquet data.";
} else {
readerType = FileReaderType::kStream;
- LOG(INFO) << "Using stream mode for reading parquet data.";
+ LOG(WARNING) << "Using stream mode for reading parquet data.";
}
if (FLAGS_run_shuffle) {
- SHUFFLE_WRITE_BENCHMARK(readerType);
+ SHUFFLE_WRITE_READ_BENCHMARK(readerType);
} else {
GENERIC_BENCHMARK(readerType);
}
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index c3baa2f33..345f9da8e 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -159,7 +159,11 @@ setLocalDirsAndDataFileFromEnv(std::string& dataFile,
std::vector<std::string>&
// Set local dirs.
auto joinedDirs = std::string(joinedDirsC);
// Split local dirs and use thread id to choose one directory for data
file.
- localDirs = gluten::splitPaths(joinedDirs);
+ auto dirs = gluten::splitPaths(joinedDirs);
+ for (const auto& dir : dirs) {
+ localDirs.push_back(arrow::fs::internal::ConcatAbstractPath(dir,
"temp_shuffle_" + generateUuid()));
+ std::filesystem::create_directory(localDirs.back());
+ }
size_t id = std::hash<std::thread::id>{}(std::this_thread::get_id()) %
localDirs.size();
ARROW_ASSIGN_OR_RAISE(dataFile,
gluten::createTempShuffleFile(localDirs[id]));
} else {
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index e165d4a91..5c6c4470b 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -1404,7 +1404,7 @@ arrow::Result<uint32_t>
VeloxHashShuffleWriter::partitionBufferSizeAfterShrink(u
arrow::Status VeloxHashShuffleWriter::preAllocPartitionBuffers(uint32_t
preAllocBufferSize) {
for (auto& pid : partitionUsed_) {
auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]);
- DLOG_IF(INFO, partitionBufferSize_[pid] != newSize)
+ LOG_IF(WARNING, partitionBufferSize_[pid] != newSize)
<< "Actual partition buffer size - current: " <<
partitionBufferSize_[pid] << ", newSize: " << newSize
<< std::endl;
// Make sure the size to be allocated is larger than the size to be filled.
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index ab93d9a33..4d002499c 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -312,8 +312,7 @@ std::shared_ptr<ColumnarBatch>
VeloxHashShuffleReaderDeserializer::next() {
if (hasComplexType_) {
uint32_t numRows;
GLUTEN_ASSIGN_OR_THROW(
- auto arrowBuffers,
- BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_,
numRows, decompressTime_));
+ auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_,
memoryPool_, numRows, decompressTime_));
if (numRows == 0) {
// Reach EOS.
return nullptr;
@@ -332,7 +331,7 @@ std::shared_ptr<ColumnarBatch>
VeloxHashShuffleReaderDeserializer::next() {
uint32_t numRows = 0;
while (!merged_ || merged_->numRows() < batchSize_) {
GLUTEN_ASSIGN_OR_THROW(
- arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_,
memoryPool_, numRows, decompressTime_));
+ arrowBuffers, BlockPayload::deserialize(in_.get(), codec_,
memoryPool_, numRows, decompressTime_));
if (numRows == 0) {
reachEos_ = true;
break;
@@ -401,7 +400,7 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::next() {
while (cachedRows_ < batchSize_) {
uint32_t numRows;
GLUTEN_ASSIGN_OR_THROW(
- auto arrowBuffers, BlockPayload::deserialize(in_.get(), schema_,
codec_, arrowPool_, numRows, decompressTime_));
+ auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_,
arrowPool_, numRows, decompressTime_));
if (numRows == 0) {
reachEos_ = true;
diff --git a/docs/developers/MicroBenchmarks.md
b/docs/developers/MicroBenchmarks.md
index 21f222b42..bd469f34c 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -320,23 +320,44 @@ cd /path/to/gluten/cpp/build/velox/benchmarks
--threads 1
```
-### Run shuffle write task only
+### Run shuffle write/read task only
Developers can only run shuffle write task via specifying `--run-shuffle` and
`--data` options.
The parquet format input will be read from arrow-parquet reader and sent to
shuffle writer.
-This option is similar to the `--with-shuffle` option, but it doesn't require
the plan and split files.
+The `--run-shuffle` option is similar to the `--with-shuffle` option, but it
doesn't require the plan and split files.
The round-robin partitioner is used by default. Besides, random partitioning
can be used for testing purpose.
By specifying option `--partitioning random`, the partitioner will generate a
random partition id for each row.
+To evaluate the shuffle reader performance, developers can set
`--run-shuffle-read` option to add read process after the write task finishes.
+
+The below command will run shuffle write/read in single thread, using sort
shuffle writer with 40000 partitions and random partition id.
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
--run-shuffle \
+--run-shuffle-read \
--data /path/to/input_for_shuffle_write.parquet
--shuffle-writer sort \
+--partitioning random \
+--shuffle-partitions 40000 \
--threads 1
```
+The output should be like:
+
+```
+-------------------------------------------------------------------------------------------------------------------------
+Benchmark Time
CPU Iterations UserCounters...
+-------------------------------------------------------------------------------------------------------------------------
+ShuffleWriteRead/iterations:1/process_time/real_time/threads:1 121637629714 ns
121309450910 ns 1 elapsed_time=121.638G read_input_time=25.2637G
shuffle_compress_time=10.8311G shuffle_decompress_time=4.04055G
shuffle_deserialize_time=7.24289G shuffle_spill_time=0
shuffle_split_time=69.9098G shuffle_write_time=2.03274G
+```
+
+## Enable debug mode
+
+`spark.gluten.sql.debug`(debug mode) is set to false by default thereby the
google glog levels are limited to only print `WARNING` or higher severity logs.
+Unless `spark.gluten.sql.debug` is set in the INI file via `--conf`, the
logging behavior is same as debug mode off.
+Developers can use `--debug-mode` command line flag to turn on debug mode when
needed, and set verbosity/severity level via command line flags `--v` and
`--minloglevel`. Note that constructing and deconstructing log strings can be
very time-consuming, which may cause benchmark times to be inaccurate.
+
## Simulate write tasks
The last operator for a write task is a file write operator, and the output
from Velox pipeline only
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index 26527e1c8..b3b7603ac 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -212,9 +212,13 @@ object ColumnarShuffleExchangeExec extends Logging {
}
def useSortBasedShuffle(partitioning: Partitioning, output: Seq[Attribute]):
Boolean = {
+ val conf = GlutenConfig.getConf
+ lazy val isCelebornSortBasedShuffle = conf.isUseCelebornShuffleManager &&
+ conf.celebornShuffleWriterType == GlutenConfig.GLUTEN_SORT_SHUFFLE_WRITER
partitioning != SinglePartition &&
(partitioning.numPartitions >=
GlutenConfig.getConf.columnarShuffleSortPartitionsThreshold ||
- output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold)
+ output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold)
||
+ isCelebornSortBasedShuffle
}
class DummyPairRDDWithPartitions(@transient private val sc: SparkContext,
numPartitions: Int)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]