This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d746f0f72 [VL] Add reader process to shuffle benchmark (#6682)
d746f0f72 is described below

commit d746f0f7296a0c84fb046cca77d605f4a927c6f7
Author: Rong Ma <[email protected]>
AuthorDate: Tue Aug 6 15:38:01 2024 +0800

    [VL] Add reader process to shuffle benchmark (#6682)
---
 cpp/core/shuffle/LocalPartitionWriter.cc           |  18 +-
 cpp/core/shuffle/Payload.cc                        |   1 -
 cpp/core/shuffle/Payload.h                         |   1 -
 cpp/velox/benchmarks/GenericBenchmark.cc           | 223 ++++++++++++++-------
 cpp/velox/benchmarks/common/BenchmarkUtils.cc      |   6 +-
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc        |   2 +-
 cpp/velox/shuffle/VeloxShuffleReader.cc            |   7 +-
 docs/developers/MicroBenchmarks.md                 |  25 ++-
 .../execution/ColumnarShuffleExchangeExec.scala    |   6 +-
 9 files changed, 198 insertions(+), 91 deletions(-)

diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index fc5d758f8..031be791b 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -609,11 +609,23 @@ arrow::Status LocalPartitionWriter::evict(uint32_t 
partitionId, std::unique_ptr<
 
   if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) {
     RETURN_NOT_OK(finishSpill(true));
+    lastEvictPid_ = -1;
   }
-  lastEvictPid_ = partitionId;
-
   RETURN_NOT_OK(requestSpill(stop));
-  RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
+
+  if (!stop) {
+    RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
+  } else {
+    if (spills_.size() > 0) {
+      for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
+        auto bytesEvicted = totalBytesEvicted_;
+        RETURN_NOT_OK(mergeSpills(pid));
+        partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+      }
+    }
+    RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload)));
+  }
+  lastEvictPid_ = partitionId;
   return arrow::Status::OK();
 }
 
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index daeef24ce..b8d8274cb 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -293,7 +293,6 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
BlockPayload::readBufferAt(uint32_
 
 arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
BlockPayload::deserialize(
     arrow::io::InputStream* inputStream,
-    const std::shared_ptr<arrow::Schema>& schema,
     const std::shared_ptr<arrow::util::Codec>& codec,
     arrow::MemoryPool* pool,
     uint32_t& numRows,
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 4c53065a6..0a317d9c3 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -88,7 +88,6 @@ class BlockPayload final : public Payload {
 
   static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
deserialize(
       arrow::io::InputStream* inputStream,
-      const std::shared_ptr<arrow::Schema>& schema,
       const std::shared_ptr<arrow::util::Codec>& codec,
       arrow::MemoryPool* pool,
       uint32_t& numRows,
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index a7776f9f1..4b46dbd59 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -30,6 +30,7 @@
 #include "compute/VeloxPlanConverter.h"
 #include "compute/VeloxRuntime.h"
 #include "config/GlutenConfig.h"
+#include "config/VeloxConfig.h"
 #include "shuffle/LocalPartitionWriter.h"
 #include "shuffle/VeloxShuffleWriter.h"
 #include "shuffle/rss/RssPartitionWriter.h"
@@ -44,22 +45,23 @@ using namespace gluten;
 
 namespace {
 
+DEFINE_bool(run_example, false, "Run the example and exit.");
 DEFINE_bool(print_result, true, "Print result for execution");
 DEFINE_string(save_output, "", "Path to parquet file for saving the task 
output iterator");
 DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
+DEFINE_bool(run_shuffle, false, "Only run shuffle write.");
+DEFINE_bool(run_shuffle_read, false, "Whether to run shuffle read when 
run_shuffle is true.");
+DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or 
sort");
 DEFINE_string(
     partitioning,
     "rr",
     "Short partitioning name. Valid options are rr, hash, range, single, 
random (only for test purpose)");
-DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or 
sort");
 DEFINE_bool(rss, false, "Mocking rss.");
 DEFINE_string(
     compression,
     "lz4",
     "Specify the compression codec. Valid options are lz4, zstd, qat_gzip, 
qat_zstd, iaa_gzip");
 DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) 
partitions");
-DEFINE_bool(run_shuffle, false, "Only run shuffle write.");
-DEFINE_bool(run_example, false, "Run the example and exit.");
 
 DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
 DEFINE_string(
@@ -76,15 +78,21 @@ DEFINE_string(
     "Scan mode for reading parquet data."
     "'stream' mode: Input file scan happens inside of the pipeline."
     "'buffered' mode: First read all data into memory and feed the pipeline 
with it.");
+DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting 
`spark.gluten.sql.debug`");
 
 struct WriterMetrics {
-  int64_t splitTime;
-  int64_t evictTime;
-  int64_t writeTime;
-  int64_t compressTime;
+  int64_t splitTime{0};
+  int64_t evictTime{0};
+  int64_t writeTime{0};
+  int64_t compressTime{0};
+
+  int64_t bytesSpilled{0};
+  int64_t bytesWritten{0};
+};
 
- public:
-  explicit WriterMetrics() : splitTime(0), evictTime(0), writeTime(0), 
compressTime(0) {}
+struct ReaderMetrics {
+  int64_t decompressTime{0};
+  int64_t deserializeTime{0};
 };
 
 void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
@@ -98,9 +106,10 @@ void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
   }
 }
 
-std::shared_ptr<VeloxShuffleWriter>
-createShuffleWriter(Runtime* runtime, const std::string& dataFile, const 
std::vector<std::string>& localDirs) {
+PartitionWriterOptions createPartitionWriterOptions() {
   PartitionWriterOptions partitionWriterOptions{};
+  // Disable writer's merge.
+  partitionWriterOptions.mergeThreshold = 0;
 
   // Configure compression.
   if (FLAGS_compression == "lz4") {
@@ -121,27 +130,39 @@ createShuffleWriter(Runtime* runtime, const std::string& 
dataFile, const std::ve
     partitionWriterOptions.codecBackend = CodecBackend::IAA;
     partitionWriterOptions.compressionType = arrow::Compression::GZIP;
   }
+  return partitionWriterOptions;
+}
 
+std::unique_ptr<PartitionWriter> createPartitionWriter(
+    Runtime* runtime,
+    PartitionWriterOptions options,
+    const std::string& dataFile,
+    const std::vector<std::string>& localDirs) {
   std::unique_ptr<PartitionWriter> partitionWriter;
   if (FLAGS_rss) {
     auto rssClient = std::make_unique<LocalRssClient>(dataFile);
     partitionWriter = std::make_unique<RssPartitionWriter>(
         FLAGS_shuffle_partitions,
-        std::move(partitionWriterOptions),
+        std::move(options),
         runtime->memoryManager()->getArrowMemoryPool(),
         std::move(rssClient));
   } else {
     partitionWriter = std::make_unique<LocalPartitionWriter>(
         FLAGS_shuffle_partitions,
-        std::move(partitionWriterOptions),
+        std::move(options),
         runtime->memoryManager()->getArrowMemoryPool(),
         dataFile,
         localDirs);
   }
+  return partitionWriter;
+}
 
+std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
+    Runtime* runtime,
+    std::unique_ptr<PartitionWriter> partitionWriter) {
   auto options = ShuffleWriterOptions{};
   options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
-  if (FLAGS_rss) {
+  if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") {
     options.shuffleWriterType = gluten::kRssSortShuffle;
   } else if (FLAGS_shuffle_writer == "sort") {
     options.shuffleWriterType = gluten::kSortShuffle;
@@ -163,6 +184,8 @@ void populateWriterMetrics(
   if (splitTime > 0) {
     metrics.splitTime += splitTime;
   }
+  metrics.bytesWritten += shuffleWriter->totalBytesWritten();
+  metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
 }
 
 void setCpu(::benchmark::State& state) {
@@ -171,7 +194,7 @@ void setCpu(::benchmark::State& state) {
   if (FLAGS_cpu != -1) {
     cpu += FLAGS_cpu;
   }
-  LOG(INFO) << "Setting CPU for thread " << state.thread_index() << " to " << 
cpu;
+  LOG(WARNING) << "Setting CPU for thread " << state.thread_index() << " to " 
<< cpu;
   gluten::setCpu(cpu);
 }
 
@@ -179,26 +202,56 @@ void runShuffle(
     Runtime* runtime,
     BenchmarkAllocationListener* listener,
     const std::shared_ptr<gluten::ResultIterator>& resultIter,
-    WriterMetrics& metrics) {
+    WriterMetrics& writerMetrics,
+    ReaderMetrics& readerMetrics,
+    bool readAfterWrite) {
   std::string dataFile;
   std::vector<std::string> localDirs;
   bool isFromEnv;
   GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, 
isFromEnv));
 
-  auto shuffleWriter = createShuffleWriter(runtime, dataFile, localDirs);
+  auto partitionWriterOptions = createPartitionWriterOptions();
+  auto partitionWriter = createPartitionWriter(runtime, 
partitionWriterOptions, dataFile, localDirs);
+  auto shuffleWriter = createShuffleWriter(runtime, 
std::move(partitionWriter));
   listener->setShuffleWriter(shuffleWriter.get());
 
   int64_t totalTime = 0;
+  std::shared_ptr<ArrowSchema> cSchema;
   {
     gluten::ScopedTimer timer(&totalTime);
     while (resultIter->hasNext()) {
-      GLUTEN_THROW_NOT_OK(
-          shuffleWriter->write(resultIter->next(), ShuffleWriter::kMaxMemLimit 
- shuffleWriter->cachedPayloadSize()));
+      auto cb = resultIter->next();
+      if (!cSchema) {
+        cSchema = cb->exportArrowSchema();
+      }
+      GLUTEN_THROW_NOT_OK(shuffleWriter->write(cb, ShuffleWriter::kMaxMemLimit 
- shuffleWriter->cachedPayloadSize()));
     }
     GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
   }
 
-  populateWriterMetrics(shuffleWriter, totalTime, metrics);
+  populateWriterMetrics(shuffleWriter, totalTime, writerMetrics);
+
+  if (readAfterWrite && cSchema) {
+    auto readerOptions = ShuffleReaderOptions{};
+    readerOptions.shuffleWriterType = 
shuffleWriter->options().shuffleWriterType;
+    readerOptions.compressionType = partitionWriterOptions.compressionType;
+    readerOptions.codecBackend = partitionWriterOptions.codecBackend;
+    readerOptions.compressionTypeStr = 
partitionWriterOptions.compressionTypeStr;
+
+    std::shared_ptr<arrow::Schema> schema =
+        gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct 
ArrowSchema*>(cSchema.get())));
+    auto reader = runtime->createShuffleReader(schema, readerOptions);
+
+    GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::ReadableFile::Open(dataFile));
+    // Read all partitions.
+    auto iter = reader->readStream(in);
+    while (iter->hasNext()) {
+      // Read and discard.
+      auto cb = iter->next();
+    }
+    readerMetrics.decompressTime = reader->getDecompressTime();
+    readerMetrics.deserializeTime = reader->getDeserializeTime();
+  }
   // Cleanup shuffle outputs
   cleanupShuffleOutput(dataFile, localDirs, isFromEnv);
 }
@@ -207,20 +260,37 @@ void updateBenchmarkMetrics(
     ::benchmark::State& state,
     const int64_t& elapsedTime,
     const int64_t& readInputTime,
-    const WriterMetrics& writerMetrics) {
+    const WriterMetrics& writerMetrics,
+    const ReaderMetrics& readerMetrics) {
   state.counters["read_input_time"] =
       benchmark::Counter(readInputTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
   state.counters["elapsed_time"] =
       benchmark::Counter(elapsedTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
 
-  state.counters["shuffle_write_time"] = benchmark::Counter(
-      writerMetrics.writeTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
-  state.counters["shuffle_spill_time"] = benchmark::Counter(
-      writerMetrics.evictTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
-  state.counters["shuffle_split_time"] = benchmark::Counter(
-      writerMetrics.splitTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
-  state.counters["shuffle_compress_time"] = benchmark::Counter(
-      writerMetrics.compressTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+  if (FLAGS_run_shuffle || FLAGS_with_shuffle) {
+    state.counters["shuffle_write_time"] = benchmark::Counter(
+        writerMetrics.writeTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+    state.counters["shuffle_spill_time"] = benchmark::Counter(
+        writerMetrics.evictTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+    state.counters["shuffle_compress_time"] = benchmark::Counter(
+        writerMetrics.compressTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+    state.counters["shuffle_decompress_time"] = benchmark::Counter(
+        readerMetrics.decompressTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+    state.counters["shuffle_deserialize_time"] = benchmark::Counter(
+        readerMetrics.deserializeTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+
+    auto splitTime = writerMetrics.splitTime;
+    if (FLAGS_scan_mode == "stream") {
+      splitTime -= readInputTime;
+    }
+    state.counters["shuffle_split_time"] =
+        benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
+
+    state.counters["shuffle_spilled_bytes"] = benchmark::Counter(
+        writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1024);
+    state.counters["shuffle_write_bytes"] = benchmark::Counter(
+        writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1024);
+  }
 }
 
 } // namespace
@@ -246,6 +316,7 @@ auto BM_Generic = [](::benchmark::State& state,
   }
 
   WriterMetrics writerMetrics{};
+  ReaderMetrics readerMetrics{};
   int64_t readInputTime = 0;
   int64_t elapsedTime = 0;
 
@@ -275,7 +346,7 @@ auto BM_Generic = [](::benchmark::State& state,
       listenerPtr->setIterator(resultIter.get());
 
       if (FLAGS_with_shuffle) {
-        runShuffle(runtime, listenerPtr, resultIter, writerMetrics);
+        runShuffle(runtime, listenerPtr, resultIter, writerMetrics, 
readerMetrics, false);
       } else {
         // May write the output into file.
         auto veloxPlan = 
dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
@@ -299,7 +370,7 @@ auto BM_Generic = [](::benchmark::State& state,
             return;
           }
           if (FLAGS_print_result) {
-            LOG(INFO) << maybeBatch.ValueOrDie()->ToString();
+            LOG(WARNING) << maybeBatch.ValueOrDie()->ToString();
           }
           if (!FLAGS_save_output.empty()) {
             
GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie()));
@@ -322,18 +393,18 @@ auto BM_Generic = [](::benchmark::State& state,
       const auto* task = rawIter->task();
       const auto* planNode = rawIter->veloxPlan();
       auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, 
task->taskStats(), true);
-      LOG(INFO) << statsStr;
+      LOG(WARNING) << statsStr;
     }
   }
 
-  updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics);
+  updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, 
readerMetrics);
   Runtime::release(runtime);
 };
 
-auto BM_ShuffleWrite = [](::benchmark::State& state,
-                          const std::string& inputFile,
-                          RuntimeFactory runtimeFactory,
-                          FileReaderType readerType) {
+auto BM_ShuffleWriteRead = [](::benchmark::State& state,
+                              const std::string& inputFile,
+                              RuntimeFactory runtimeFactory,
+                              FileReaderType readerType) {
   setCpu(state);
 
   auto listener = 
std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit);
@@ -341,31 +412,48 @@ auto BM_ShuffleWrite = [](::benchmark::State& state,
   auto runtime = runtimeFactory(std::move(listener));
 
   WriterMetrics writerMetrics{};
+  ReaderMetrics readerMetrics{};
   int64_t readInputTime = 0;
   int64_t elapsedTime = 0;
   {
     ScopedTimer timer(&elapsedTime);
     for (auto _ : state) {
       auto resultIter = getInputIteratorFromFileReader(inputFile, readerType);
-      runShuffle(runtime, listenerPtr, resultIter, writerMetrics);
+      runShuffle(runtime, listenerPtr, resultIter, writerMetrics, 
readerMetrics, FLAGS_run_shuffle_read);
 
       auto reader = 
static_cast<FileReaderIterator*>(resultIter->getInputIter());
       readInputTime += reader->getCollectBatchTime();
     }
   }
 
-  updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics);
+  updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, 
readerMetrics);
   Runtime::release(runtime);
 };
 
 int main(int argc, char** argv) {
-  ::benchmark::Initialize(&argc, argv);
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
+  std::ostringstream ss;
+  ss << "Setting flags from command line args: " << std::endl;
+  std::vector<google::CommandLineFlagInfo> flags;
+  google::GetAllFlags(&flags);
+  auto filename = std::filesystem::path(__FILE__).filename();
+  for (const auto& flag : flags) {
+    if (std::filesystem::path(flag.filename).filename() == filename) {
+      ss << "    FLAGS_" << flag.name << ": default = " << flag.default_value 
<< ", current = " << flag.current_value
+         << std::endl;
+    }
+  }
+  LOG(WARNING) << ss.str();
+
+  ::benchmark::Initialize(&argc, argv);
+
   // Init Velox backend.
   auto backendConf = gluten::defaultConf();
   auto sessionConf = gluten::defaultConf();
-  backendConf.insert({gluten::kSparkBatchSize, 
std::to_string(FLAGS_batch_size)});
+  backendConf.insert({gluten::kDebugModeEnabled, 
std::to_string(FLAGS_debug_mode)});
+  backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)});
+  backendConf.insert({gluten::kGlogSeverityLevel, 
std::to_string(FLAGS_minloglevel)});
   if (!FLAGS_conf.empty()) {
     abortIfFileNotExists(FLAGS_conf);
     std::ifstream file(FLAGS_conf);
@@ -425,7 +513,7 @@ int main(int argc, char** argv) {
   std::vector<std::string> dataFiles{};
 
   if (FLAGS_run_example) {
-    LOG(INFO) << "Running example...";
+    LOG(WARNING) << "Running example...";
     dataFiles.resize(2);
     try {
       substraitJsonFile = getGeneratedFilePath("example.json");
@@ -484,33 +572,23 @@ int main(int argc, char** argv) {
 
     if (!errorMsg.empty()) {
       LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl
-                 << "If simulating a first stage, the usage is:" << std::endl
-                 << "./generic_benchmark "
-                 << "--plan /absolute-path/to/substrait_json_file "
-                 << "--split 
/absolute-path/to/split_json_file_1,/abosolute-path/to/split_json_file_2,..."
-                 << "--data 
/absolute-path/to/data_file_1,/absolute-path/to/data_file_2,..." << std::endl
-                 << "If simulating a middle stage, the usage is:" << std::endl
-                 << "./generic_benchmark "
-                 << "--plan /absolute-path/to/substrait_json_file "
-                 << "--data 
/absolute-path/to/data_file_1,/absolute-path/to/data_file_2,...";
-      LOG(ERROR) << "*** Please check docs/developers/MicroBenchmarks.md for 
the full usage. ***";
+                 << "*** Please check docs/developers/MicroBenchmarks.md for 
the full usage. ***";
       ::benchmark::Shutdown();
       std::exit(EXIT_FAILURE);
     }
   }
 
-  // Check whether input files exist.
-  LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile;
+  LOG(WARNING) << "Using substrait json file: " << std::endl << 
substraitJsonFile;
   if (!splitFiles.empty()) {
-    LOG(INFO) << "Using " << splitFiles.size() << " input split file(s): ";
+    LOG(WARNING) << "Using " << splitFiles.size() << " input split file(s): ";
     for (const auto& splitFile : splitFiles) {
-      LOG(INFO) << splitFile;
+      LOG(WARNING) << splitFile;
     }
   }
   if (!dataFiles.empty()) {
-    LOG(INFO) << "Using " << dataFiles.size() << " input data file(s): ";
+    LOG(WARNING) << "Using " << dataFiles.size() << " input data file(s): ";
     for (const auto& dataFile : dataFiles) {
-      LOG(INFO) << dataFile;
+      LOG(WARNING) << dataFile;
     }
   }
 
@@ -528,37 +606,28 @@ int main(int argc, char** argv) {
     setUpBenchmark(bm);                                                        
                                    \
   } while (0)
 
-#define SHUFFLE_WRITE_BENCHMARK(READER_TYPE)                                   
                                    \
-  do {                                                                         
                                    \
-    auto* bm =                                                                 
                                    \
-        ::benchmark::RegisterBenchmark("ShuffleWrite", BM_ShuffleWrite, 
dataFiles[0], runtimeFactory, READER_TYPE) \
-            ->MeasureProcessCPUTime()                                          
                                    \
-            ->UseRealTime();                                                   
                                    \
-    setUpBenchmark(bm);                                                        
                                    \
+#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE)                              
                        \
+  do {                                                                         
                        \
+    auto* bm = ::benchmark::RegisterBenchmark(                                 
                        \
+                   "ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], 
runtimeFactory, READER_TYPE) \
+                   ->MeasureProcessCPUTime()                                   
                        \
+                   ->UseRealTime();                                            
                        \
+    setUpBenchmark(bm);                                                        
                        \
   } while (0)
 
-  LOG(INFO) << "Using options: ";
-  LOG(INFO) << "threads: " << FLAGS_threads;
-  LOG(INFO) << "iterations: " << FLAGS_iterations;
-  LOG(INFO) << "cpu: " << FLAGS_cpu;
-  LOG(INFO) << "print_result: " << FLAGS_print_result;
-  LOG(INFO) << "save_output: " << FLAGS_save_output;
-  LOG(INFO) << "batch_size: " << FLAGS_batch_size;
-  LOG(INFO) << "write_path: " << FLAGS_write_path;
-
   if (dataFiles.empty()) {
     GENERIC_BENCHMARK(FileReaderType::kNone);
   } else {
     FileReaderType readerType;
     if (FLAGS_scan_mode == "buffered") {
       readerType = FileReaderType::kBuffered;
-      LOG(INFO) << "Using buffered mode for reading parquet data.";
+      LOG(WARNING) << "Using buffered mode for reading parquet data.";
     } else {
       readerType = FileReaderType::kStream;
-      LOG(INFO) << "Using stream mode for reading parquet data.";
+      LOG(WARNING) << "Using stream mode for reading parquet data.";
     }
     if (FLAGS_run_shuffle) {
-      SHUFFLE_WRITE_BENCHMARK(readerType);
+      SHUFFLE_WRITE_READ_BENCHMARK(readerType);
     } else {
       GENERIC_BENCHMARK(readerType);
     }
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc 
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index c3baa2f33..345f9da8e 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -159,7 +159,11 @@ setLocalDirsAndDataFileFromEnv(std::string& dataFile, 
std::vector<std::string>&
     // Set local dirs.
     auto joinedDirs = std::string(joinedDirsC);
     // Split local dirs and use thread id to choose one directory for data 
file.
-    localDirs = gluten::splitPaths(joinedDirs);
+    auto dirs = gluten::splitPaths(joinedDirs);
+    for (const auto& dir : dirs) {
+      localDirs.push_back(arrow::fs::internal::ConcatAbstractPath(dir, 
"temp_shuffle_" + generateUuid()));
+      std::filesystem::create_directory(localDirs.back());
+    }
     size_t id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 
localDirs.size();
     ARROW_ASSIGN_OR_RAISE(dataFile, 
gluten::createTempShuffleFile(localDirs[id]));
   } else {
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index e165d4a91..5c6c4470b 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -1404,7 +1404,7 @@ arrow::Result<uint32_t> 
VeloxHashShuffleWriter::partitionBufferSizeAfterShrink(u
 arrow::Status VeloxHashShuffleWriter::preAllocPartitionBuffers(uint32_t 
preAllocBufferSize) {
   for (auto& pid : partitionUsed_) {
     auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]);
-    DLOG_IF(INFO, partitionBufferSize_[pid] != newSize)
+    LOG_IF(WARNING, partitionBufferSize_[pid] != newSize)
         << "Actual partition buffer size - current: " << 
partitionBufferSize_[pid] << ", newSize: " << newSize
         << std::endl;
     // Make sure the size to be allocated is larger than the size to be filled.
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index ab93d9a33..4d002499c 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -312,8 +312,7 @@ std::shared_ptr<ColumnarBatch> 
VeloxHashShuffleReaderDeserializer::next() {
   if (hasComplexType_) {
     uint32_t numRows;
     GLUTEN_ASSIGN_OR_THROW(
-        auto arrowBuffers,
-        BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, 
numRows, decompressTime_));
+        auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, 
memoryPool_, numRows, decompressTime_));
     if (numRows == 0) {
       // Reach EOS.
       return nullptr;
@@ -332,7 +331,7 @@ std::shared_ptr<ColumnarBatch> 
VeloxHashShuffleReaderDeserializer::next() {
   uint32_t numRows = 0;
   while (!merged_ || merged_->numRows() < batchSize_) {
     GLUTEN_ASSIGN_OR_THROW(
-        arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, 
memoryPool_, numRows, decompressTime_));
+        arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, 
memoryPool_, numRows, decompressTime_));
     if (numRows == 0) {
       reachEos_ = true;
       break;
@@ -401,7 +400,7 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::next() {
   while (cachedRows_ < batchSize_) {
     uint32_t numRows;
     GLUTEN_ASSIGN_OR_THROW(
-        auto arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, 
codec_, arrowPool_, numRows, decompressTime_));
+        auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, 
arrowPool_, numRows, decompressTime_));
 
     if (numRows == 0) {
       reachEos_ = true;
diff --git a/docs/developers/MicroBenchmarks.md 
b/docs/developers/MicroBenchmarks.md
index 21f222b42..bd469f34c 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -320,23 +320,44 @@ cd /path/to/gluten/cpp/build/velox/benchmarks
 --threads 1
 ```
 
-### Run shuffle write task only
+### Run shuffle write/read task only
 
 Developers can only run shuffle write task via specifying `--run-shuffle` and 
`--data` options.
 The parquet format input will be read from arrow-parquet reader and sent to 
shuffle writer.
-This option is similar to the `--with-shuffle` option, but it doesn't require 
the plan and split files.
+The `--run-shuffle` option is similar to the `--with-shuffle` option, but it 
doesn't require the plan and split files.
 The round-robin partitioner is used by default. Besides, random partitioning 
can be used for testing purpose.
 By specifying option `--partitioning random`, the partitioner will generate a 
random partition id for each row.
+To evaluate the shuffle reader performance, developers can set 
`--run-shuffle-read` option to add read process after the write task finishes.
+
+The below command will run shuffle write/read in single thread, using sort 
shuffle writer with 40000 partitions and random partition id.
 
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
 --run-shuffle \
+--run-shuffle-read \
 --data /path/to/input_for_shuffle_write.parquet
 --shuffle-writer sort \
+--partitioning random \
+--shuffle-partitions 40000 \
 --threads 1
 ```
 
+The output should be like:
+
+```
+-------------------------------------------------------------------------------------------------------------------------
+Benchmark                                                               Time   
          CPU   Iterations UserCounters...
+-------------------------------------------------------------------------------------------------------------------------
+ShuffleWriteRead/iterations:1/process_time/real_time/threads:1 121637629714 ns 
  121309450910 ns            1 elapsed_time=121.638G read_input_time=25.2637G 
shuffle_compress_time=10.8311G shuffle_decompress_time=4.04055G 
shuffle_deserialize_time=7.24289G shuffle_spill_time=0 
shuffle_split_time=69.9098G shuffle_write_time=2.03274G
+```
+
+## Enable debug mode
+
+`spark.gluten.sql.debug`(debug mode) is set to false by default thereby the 
google glog levels are limited to only print `WARNING` or higher severity logs.
+Unless `spark.gluten.sql.debug` is set in the INI file via `--conf`, the 
logging behavior is same as debug mode off.
+Developers can use `--debug-mode` command line flag to turn on debug mode when 
needed, and set verbosity/severity level via command line flags `--v` and 
`--minloglevel`. Note that constructing and deconstructing log strings can be 
very time-consuming, which may cause benchmark times to be inaccurate.
+
 ## Simulate write tasks
 
 The last operator for a write task is a file write operator, and the output 
from Velox pipeline only
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index 26527e1c8..b3b7603ac 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -212,9 +212,13 @@ object ColumnarShuffleExchangeExec extends Logging {
   }
 
   def useSortBasedShuffle(partitioning: Partitioning, output: Seq[Attribute]): 
Boolean = {
+    val conf = GlutenConfig.getConf
+    lazy val isCelebornSortBasedShuffle = conf.isUseCelebornShuffleManager &&
+      conf.celebornShuffleWriterType == GlutenConfig.GLUTEN_SORT_SHUFFLE_WRITER
     partitioning != SinglePartition &&
     (partitioning.numPartitions >= 
GlutenConfig.getConf.columnarShuffleSortPartitionsThreshold ||
-      output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold)
+      output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold) 
||
+    isCelebornSortBasedShuffle
   }
 
   class DummyPairRDDWithPartitions(@transient private val sc: SparkContext, 
numPartitions: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to