This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b173657a57 [VL] Fix dump benchmark data issue in same task (#9360)
b173657a57 is described below

commit b173657a57ef8b2b93e43f0266f7563d770c72d4
Author: Yang Zhang <[email protected]>
AuthorDate: Mon Apr 28 22:13:37 2025 +0800

    [VL] Fix dump benchmark data issue in same task (#9360)
---
 .github/workflows/velox_backend.yml           |  6 +-
 cpp/core/compute/ProtobufUtils.cc             | 14 +----
 cpp/core/compute/ProtobufUtils.h              |  6 +-
 cpp/core/compute/Runtime.h                    | 17 +++---
 cpp/core/jni/JniWrapper.cc                    | 41 ++-----------
 cpp/velox/benchmarks/GenericBenchmark.cc      |  7 ++-
 cpp/velox/compute/VeloxRuntime.cc             | 88 +++++++++++++++++++--------
 cpp/velox/compute/VeloxRuntime.h              | 14 +++--
 cpp/velox/compute/WholeStageResultIterator.cc |  6 +-
 cpp/velox/jni/VeloxJniWrapper.cc              |  2 +-
 cpp/velox/tests/RuntimeTest.cc                |  8 +--
 docs/developers/MicroBenchmarks.md            | 76 ++++++++++++-----------
 12 files changed, 144 insertions(+), 141 deletions(-)

diff --git a/.github/workflows/velox_backend.yml 
b/.github/workflows/velox_backend.yml
index 6c0538aaf8..cb4aeaa845 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -1263,9 +1263,9 @@ jobs:
           -DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none 
-DfailIfNoTests=false -Dexec.skip
           # This test depends on files generated by the above mvn test.
           ./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle 
--partitioning hash --threads 1 --iterations 1 \
-          --conf $(realpath 
backends-velox/generated-native-benchmark/conf_12_0.ini) \
-          --plan $(realpath 
backends-velox/generated-native-benchmark/plan_12_0.json) \
-          --data $(realpath 
backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath 
backends-velox/generated-native-benchmark/data_12_0_1.parquet)
+          --conf $(realpath 
backends-velox/generated-native-benchmark/conf_12_0_*.ini) \
+          --plan $(realpath 
backends-velox/generated-native-benchmark/plan_12_0_*.json) \
+          --data $(realpath 
backends-velox/generated-native-benchmark/data_12_0_*_0.parquet),$(realpath 
backends-velox/generated-native-benchmark/data_12_0_*_1.parquet)
       - name: Run UDF test
         run: |
           # Depends on --build_example=ON.
diff --git a/cpp/core/compute/ProtobufUtils.cc 
b/cpp/core/compute/ProtobufUtils.cc
index c41f062ac5..22967efbc1 100644
--- a/cpp/core/compute/ProtobufUtils.cc
+++ b/cpp/core/compute/ProtobufUtils.cc
@@ -63,11 +63,7 @@ std::string substraitFromJsonToPb(std::string_view typeName, 
std::string_view js
   return out;
 }
 
-std::string substraitFromPbToJson(
-    std::string_view typeName,
-    const uint8_t* data,
-    int32_t size,
-    std::optional<std::string> dumpFile) {
+std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* 
data, int32_t size) {
   std::string typeUrl = "/substrait." + std::string(typeName);
 
   google::protobuf::io::ArrayInputStream bufStream{data, size};
@@ -79,14 +75,6 @@ std::string substraitFromPbToJson(
   if (!status.ok()) {
     throw GlutenException("BinaryToJsonStream returned " + status.ToString());
   }
-
-  if (dumpFile.has_value()) {
-    std::ofstream outFile(*dumpFile);
-    if (!outFile.is_open()) {
-      LOG(ERROR) << "Failed to open file for writing: " << *dumpFile;
-    }
-    outFile << out << std::endl;
-  }
   return out;
 }
 
diff --git a/cpp/core/compute/ProtobufUtils.h b/cpp/core/compute/ProtobufUtils.h
index 849f88f039..a4207dd786 100644
--- a/cpp/core/compute/ProtobufUtils.h
+++ b/cpp/core/compute/ProtobufUtils.h
@@ -27,10 +27,6 @@ bool parseProtobuf(const uint8_t* buf, int bufLen, 
google::protobuf::Message* ms
 
 std::string substraitFromJsonToPb(std::string_view typeName, std::string_view 
json);
 
-std::string substraitFromPbToJson(
-    std::string_view typeName,
-    const uint8_t* data,
-    int32_t size,
-    std::optional<std::string> dumpFile);
+std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* 
data, int32_t size);
 
 } // namespace gluten
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 16acada54f..ddb233541f 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -42,13 +42,16 @@ struct SparkTaskInfo {
   int32_t partitionId{0};
   // Same as TID.
   int64_t taskId{0};
+  // virtual id for each backend internal use
+  int32_t vId{0};
 
   std::string toString() const {
-    return "[Stage: " + std::to_string(stageId) + " TID: " + 
std::to_string(taskId) + "]";
+    return "[Stage: " + std::to_string(stageId) + " TID: " + 
std::to_string(taskId) + " VID: " + std::to_string(vId) +
+        "]";
   }
 
   friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& 
taskInfo) {
-    os << "[Stage: " << taskInfo.stageId << " TID: " << taskInfo.taskId << "]";
+    os << taskInfo.toString();
     return os;
   }
 };
@@ -80,11 +83,11 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
     return kind_;
   }
 
-  virtual void parsePlan(const uint8_t* data, int32_t size, 
std::optional<std::string> dumpFile) {
+  virtual void parsePlan(const uint8_t* data, int32_t size, bool dumpPlan) {
     throw GlutenException("Not implemented");
   }
 
-  virtual void parseSplitInfo(const uint8_t* data, int32_t size, 
std::optional<std::string> dumpFile) {
+  virtual void parseSplitInfo(const uint8_t* data, int32_t size, int32_t idx, 
bool dumpSplit) {
     throw GlutenException("Not implemented");
   }
 
@@ -147,11 +150,11 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
     throw GlutenException("Not implemented");
   }
 
-  virtual void dumpConf(const std::string& path) {
+  virtual void dumpConf(bool dump) {
     throw GlutenException("Not implemented");
   }
 
-  virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& 
path) {
+  virtual std::shared_ptr<ArrowWriter> createArrowWriter(bool dumpData, 
int32_t idx) {
     throw GlutenException("Not implemented");
   };
 
@@ -159,7 +162,7 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
     return confMap_;
   }
 
-  void setSparkTaskInfo(SparkTaskInfo taskInfo) {
+  virtual void setSparkTaskInfo(SparkTaskInfo taskInfo) {
     taskInfo_ = taskInfo;
   }
 
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 965bb2a7c4..ecc816a79b 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -375,7 +375,7 @@ JNIEXPORT jstring JNICALL 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap
   auto planData = safeArray.elems();
   auto planSize = env->GetArrayLength(planArray);
   auto ctx = getRuntime(env, wrapper);
-  ctx->parsePlan(planData, planSize, std::nullopt);
+  ctx->parsePlan(planData, planSize, false);
   auto& conf = ctx->getConfMap();
   auto planString = ctx->planString(details, conf);
   return env->NewStringUTF(planString.c_str());
@@ -414,58 +414,27 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
 
   ctx->setSparkTaskInfo({stageId, partitionId, taskId});
 
-  std::string saveDir{};
-  std::string fileIdentifier = "_" + std::to_string(stageId) + "_" + 
std::to_string(partitionId);
-  if (saveInput) {
-    if (conf.find(kGlutenSaveDir) == conf.end()) {
-      throw GlutenException(kGlutenSaveDir + " is not configured.");
-    }
-    saveDir = conf.at(kGlutenSaveDir);
-    std::filesystem::path f{saveDir};
-    if (std::filesystem::exists(f)) {
-      if (!std::filesystem::is_directory(f)) {
-        throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " + 
saveDir);
-      }
-    } else {
-      std::error_code ec;
-      std::filesystem::create_directory(f, ec);
-      if (ec) {
-        throw GlutenException("Failed to create directory: " + saveDir + ", 
error message: " + ec.message());
-      }
-    }
-    ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini");
-  }
+  ctx->dumpConf(saveInput);
 
   auto spillDirStr = jStringToCString(env, spillDir);
 
   auto safePlanArray = getByteArrayElementsSafe(env, planArr);
   auto planSize = env->GetArrayLength(planArr);
-  ctx->parsePlan(
-      safePlanArray.elems(),
-      planSize,
-      saveInput ? std::optional<std::string>(saveDir + "/plan" + 
fileIdentifier + ".json") : std::nullopt);
+  ctx->parsePlan(safePlanArray.elems(), planSize, saveInput);
 
   for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i 
< splitInfoArraySize; i++) {
     jbyteArray splitInfoArray = 
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
     jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
     auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
     auto splitInfoData = safeSplitArray.elems();
-    ctx->parseSplitInfo(
-        splitInfoData,
-        splitInfoSize,
-        saveInput ? std::optional<std::string>(saveDir + "/split" + 
fileIdentifier + "_" + std::to_string(i) + ".json")
-                  : std::nullopt);
+    ctx->parseSplitInfo(splitInfoData, splitInfoSize, i, saveInput);
   }
 
   // Handle the Java iters
   jsize itersLen = env->GetArrayLength(iterArr);
   std::vector<std::shared_ptr<ResultIterator>> inputIters;
   for (int idx = 0; idx < itersLen; idx++) {
-    std::shared_ptr<ArrowWriter> writer = nullptr;
-    if (saveInput) {
-      auto file = saveDir + "/data" + fileIdentifier + "_" + 
std::to_string(idx) + ".parquet";
-      writer = ctx->createArrowWriter(file);
-    }
+    auto writer = ctx->createArrowWriter(saveInput, idx);
     jobject iter = env->GetObjectArrayElement(iterArr, idx);
     auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer);
     auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 1a48ca815d..b73840fbaf 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -437,9 +437,10 @@ auto BM_Generic = [](::benchmark::State& state,
             });
       }
       *Runtime::localWriteFilesTempPath() = FLAGS_write_path;
-      runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), 
std::nullopt);
-      for (auto& split : splits) {
-        runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), 
split.size(), std::nullopt);
+      runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), 
false);
+      for (auto i = 0; i < splits.size(); i++) {
+        auto split = splits[i];
+        runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), 
split.size(), i, false);
       }
 
       auto resultIter = runtime->createResultIterator(veloxSpillDir, 
std::move(inputIters), runtime->getConfMap());
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index e2309886a2..a328e9b22d 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -57,6 +57,34 @@ 
DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);
 using namespace facebook;
 
 namespace gluten {
+namespace {
+void dumpToStorage(
+    const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
+    const std::string& dumpFile,
+    const std::string content) {
+  auto saveDir = conf->get<std::string>(kGlutenSaveDir).value();
+  std::filesystem::path f{saveDir};
+  if (std::filesystem::exists(f)) {
+    if (!std::filesystem::is_directory(f)) {
+      throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " + 
saveDir);
+    }
+  } else {
+    std::error_code ec;
+    std::filesystem::create_directory(f, ec);
+    if (ec) {
+      throw GlutenException("Failed to create directory: " + saveDir + ", 
error message: " + ec.message());
+    }
+  }
+  std::string dumpPath = f / dumpFile;
+  std::ofstream outFile(dumpPath);
+  if (!outFile.is_open()) {
+    LOG(ERROR) << "Failed to open file for writing: " << dumpPath;
+    return;
+  }
+  outFile << content;
+  outFile.close();
+}
+} // namespace
 
 VeloxRuntime::VeloxRuntime(
     const std::string& kind,
@@ -78,10 +106,14 @@ VeloxRuntime::VeloxRuntime(
       kMemoryPoolCapacityTransferAcrossTasks, 
FLAGS_velox_memory_pool_capacity_transfer_across_tasks);
 }
 
-void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, 
std::optional<std::string> dumpFile) {
-  if (debugModeEnabled_ || dumpFile.has_value()) {
+void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, bool dumpPlan) 
{
+  if (debugModeEnabled_ || dumpPlan) {
     try {
-      auto planJson = substraitFromPbToJson("Plan", data, size, dumpFile);
+      auto planJson = substraitFromPbToJson("Plan", data, size);
+      if (dumpPlan) {
+        auto dumpFile = fmt::format("plan_{}_{}_{}.json", taskInfo_.stageId, 
taskInfo_.partitionId, taskInfo_.vId);
+        dumpToStorage(veloxCfg_, dumpFile, planJson);
+      }
       LOG_IF(INFO, debugModeEnabled_) << std::string(50, '#') << " received 
substrait::Plan: " << taskInfo_ << std::endl
                                       << planJson;
     } catch (const std::exception& e) {
@@ -92,10 +124,15 @@ void VeloxRuntime::parsePlan(const uint8_t* data, int32_t 
size, std::optional<st
   GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse 
substrait plan failed");
 }
 
-void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size, 
std::optional<std::string> dumpFile) {
-  if (debugModeEnabled_ || dumpFile.has_value()) {
+void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size, int32_t 
idx, bool dumpSplit) {
+  if (debugModeEnabled_ || dumpSplit) {
     try {
-      auto splitJson = substraitFromPbToJson("ReadRel.LocalFiles", data, size, 
dumpFile);
+      auto splitJson = substraitFromPbToJson("ReadRel.LocalFiles", data, size);
+      if (dumpSplit) {
+        auto dumpFile =
+            fmt::format("split_{}_{}_{}_{}.json", taskInfo_.stageId, 
taskInfo_.partitionId, taskInfo_.vId, idx);
+        dumpToStorage(veloxCfg_, dumpFile, splitJson);
+      }
       LOG_IF(INFO, debugModeEnabled_) << std::string(50, '#')
                                       << " received 
substrait::ReadRel.LocalFiles: " << taskInfo_ << std::endl
                                       << splitJson;
@@ -283,7 +320,11 @@ std::unique_ptr<ColumnarBatchSerializer> 
VeloxRuntime::createColumnarBatchSerial
   return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, 
cSchema);
 }
 
-void VeloxRuntime::dumpConf(const std::string& path) {
+void VeloxRuntime::dumpConf(bool dump) {
+  if (!dump) {
+    return;
+  }
+
   const auto& backendConfMap = 
VeloxBackend::get()->getBackendConf()->rawConfigs();
   auto allConfMap = backendConfMap;
 
@@ -291,13 +332,7 @@ void VeloxRuntime::dumpConf(const std::string& path) {
     allConfMap.insert_or_assign(pair.first, pair.second);
   }
 
-  // Open file "velox.conf" for writing, automatically creating it if it 
doesn't exist,
-  // or overwriting it if it does.
-  std::ofstream outFile(path);
-  if (!outFile.is_open()) {
-    LOG(ERROR) << "Failed to open file for writing: " << path;
-    return;
-  }
+  std::stringstream out;
 
   // Calculate the maximum key length for alignment.
   size_t maxKeyLength = 0;
@@ -306,24 +341,29 @@ void VeloxRuntime::dumpConf(const std::string& path) {
   }
 
   // Write each key-value pair to the file with adjusted spacing for alignment
-  outFile << "[Backend Conf]" << std::endl;
+  out << "[Backend Conf]" << std::endl;
   for (const auto& pair : backendConfMap) {
-    outFile << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' 
<< pair.second << std::endl;
+    out << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' << 
pair.second << std::endl;
   }
-  outFile << std::endl << "[Session Conf]" << std::endl;
+  out << std::endl << "[Session Conf]" << std::endl;
   for (const auto& pair : confMap_) {
-    outFile << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' 
<< pair.second << std::endl;
+    out << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' << 
pair.second << std::endl;
   }
 
-  outFile.close();
+  auto dumpPath = fmt::format("conf_{}_{}_{}.ini", taskInfo_.stageId, 
taskInfo_.partitionId, taskInfo_.vId);
+  dumpToStorage(veloxCfg_, dumpPath, out.str());
 }
 
-std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(const 
std::string& path) {
-  int64_t batchSize = 4096;
-  if (auto it = confMap_.find(kSparkBatchSize); it != confMap_.end()) {
-    batchSize = std::atol(it->second.c_str());
+std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(bool dumpData, 
int32_t idx) {
+  if (!dumpData) {
+    return nullptr;
   }
-  return std::make_shared<VeloxArrowWriter>(path, batchSize, 
memoryManager()->getLeafMemoryPool().get());
+
+  auto saveDir = veloxCfg_->get<std::string>(kGlutenSaveDir).value();
+  auto dumpPath =
+      fmt::format("{}/data_{}_{}_{}_{}.parquet", saveDir, taskInfo_.stageId, 
taskInfo_.partitionId, taskInfo_.vId, idx);
+  auto batchSize = veloxCfg_->get<int64_t>(kSparkBatchSize, 4096);
+  return std::make_shared<VeloxArrowWriter>(dumpPath, batchSize, 
memoryManager()->getLeafMemoryPool().get());
 }
 
 } // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 798fa5bc72..163e09b2ea 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -35,9 +35,15 @@ class VeloxRuntime final : public Runtime {
       VeloxMemoryManager* vmm,
       const std::unordered_map<std::string, std::string>& confMap);
 
-  void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string> 
dumpFile) override;
+  void setSparkTaskInfo(SparkTaskInfo taskInfo) override {
+    static std::atomic<uint32_t> vtId{0};
+    taskInfo_ = taskInfo;
+    taskInfo_.vId = vtId++;
+  }
+
+  void parsePlan(const uint8_t* data, int32_t size, bool dumpPlan) override;
 
-  void parseSplitInfo(const uint8_t* data, int32_t size, 
std::optional<std::string> dumpFile) override;
+  void parseSplitInfo(const uint8_t* data, int32_t size, int32_t idx, bool 
dumpSplitInfo) override;
 
   VeloxMemoryManager* memoryManager() override;
 
@@ -74,9 +80,9 @@ class VeloxRuntime final : public Runtime {
 
   std::string planString(bool details, const std::unordered_map<std::string, 
std::string>& sessionConf) override;
 
-  void dumpConf(const std::string& path) override;
+  void dumpConf(bool dump) override;
 
-  std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path) 
override;
+  std::shared_ptr<ArrowWriter> createArrowWriter(bool dumpData, int32_t idx) 
override;
 
   std::shared_ptr<VeloxDataSource> createDataSource(const std::string& 
filePath, std::shared_ptr<arrow::Schema> schema);
 
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 666610d907..79d7669737 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -82,13 +82,12 @@ WholeStageResultIterator::WholeStageResultIterator(
   std::unordered_set<velox::core::PlanNodeId> emptySet;
   velox::core::PlanFragment planFragment{planNode, 
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
   std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
-  static std::atomic<uint32_t> vtId{0}; // Velox task ID to distinguish from 
Spark task ID.
   task_ = velox::exec::Task::create(
       fmt::format(
           "Gluten_Stage_{}_TID_{}_VTID_{}",
           std::to_string(taskInfo_.stageId),
           std::to_string(taskInfo_.taskId),
-          std::to_string(vtId++)),
+          std::to_string(taskInfo.vId)),
       std::move(planFragment),
       0,
       std::move(queryCtx),
@@ -179,7 +178,6 @@ WholeStageResultIterator::WholeStageResultIterator(
 std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQueryCtx() {
   std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> 
connectorConfigs;
   connectorConfigs[kHiveConnectorId] = createConnectorConfig();
-  static std::atomic<uint32_t> vqId{0}; // Velox query ID, same with taskId.
   std::shared_ptr<velox::core::QueryCtx> ctx = velox::core::QueryCtx::create(
       nullptr,
       facebook::velox::core::QueryConfig{getQueryContextConf()},
@@ -191,7 +189,7 @@ std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQ
           "Gluten_Stage_{}_TID_{}_VTID_{}",
           std::to_string(taskInfo_.stageId),
           std::to_string(taskInfo_.taskId),
-          std::to_string(vqId++)));
+          std::to_string(taskInfo_.vId)));
   return ctx;
 }
 
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 25e946b68b..c5f1d2ded9 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -138,7 +138,7 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
   const auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
   if (runtime->debugModeEnabled()) {
     try {
-      const auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize, 
std::nullopt);
+      const auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize);
       LOG(INFO) << std::string(50, '#') << " received substrait::Plan: for 
validation";
       LOG(INFO) << jsonPlan;
     } catch (const std::exception& e) {
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 212b0201ed..e15d3bdb27 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -50,9 +50,9 @@ class DummyRuntime final : public Runtime {
       const std::unordered_map<std::string, std::string>& conf)
       : Runtime(kind, mm, conf) {}
 
-  void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string> 
dumpFile) override {}
+  void parsePlan(const uint8_t* data, int32_t size, bool dump) override {}
 
-  void parseSplitInfo(const uint8_t* data, int32_t size, 
std::optional<std::string> dumpFile) override {}
+  void parseSplitInfo(const uint8_t* data, int32_t size, int32_t idx, bool 
dump) override {}
 
   std::shared_ptr<ResultIterator> createResultIterator(
       const std::string& spillDir,
@@ -99,11 +99,11 @@ class DummyRuntime final : public Runtime {
     throw GlutenException("Not yet implemented");
   }
 
-  void dumpConf(const std::string& path) override {
+  void dumpConf(bool dump) override {
     throw GlutenException("Not yet implemented");
   }
 
-  std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path) 
override {
+  std::shared_ptr<ArrowWriter> createArrowWriter(bool dump, int32_t) override {
     throw GlutenException("Not yet implemented");
   }
 
diff --git a/docs/developers/MicroBenchmarks.md 
b/docs/developers/MicroBenchmarks.md
index ccb01d4737..2121d2a1de 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -43,16 +43,17 @@ mvn test -Pspark-3.2 -Pbackends-velox -pl backends-velox 
-am \
 
 The generated example files are placed in gluten/backends-velox:
 
+- stageId means Spark stage id in web ui.
+- partitionedId means Spark partition id in web stage ui.
+- vId means backends internal virtual id.
+
 ```shell
 $ tree gluten/backends-velox/generated-native-benchmark/
 gluten/backends-velox/generated-native-benchmark/
-├── example.json
-├── example_lineitem
-│   ├── part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet
-│   └── _SUCCESS
-└── example_orders
-    ├── part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet
-    └── _SUCCESS
+├── conf_12_10_3.ini
+├── data_12_10_3_0.parquet
+├── data_12_10_3_1.parquet
+├── plan_12_10_3.json
 ```
 
 Run micro benchmark with the generated files as input. You need to specify the 
**absolute** path to
@@ -61,9 +62,10 @@ the input files:
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---plan 
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example.json
 \
---data 
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_orders/part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet,\
-/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_lineitem/part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet
 \
+--plan 
<path-to-gluten>/backends-velox/generated-native-benchmark/plan_{stageId}_{partitionId}_{vId}.json
 \
+--data 
<path-to-gluten>/backends-velox/generated-native-benchmark/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet,\
+<path-to-gluten>/backends-velox/generated-native-benchmark/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
 \
+--conf 
<path-to-gluten>/backends-velox/generated-native-benchmark/conf_{stageId}_{partitionId}_{vId}.ini
 \
 --threads 1 --iterations 1 --noprint-result
 ```
 
@@ -128,15 +130,15 @@ And then re-run the query with below configurations to 
dump the inputs to micro
 Check the files in `spark.gluten.saveDir`. If the simulated stage is a first 
stage, you will get 3
 or 4 types of dumped file:
 
-- Configuration file: INI formatted, file name 
`conf_[stageId]_[partitionId].ini`. Contains the
+- Configuration file: INI formatted, file name 
`conf_{stageId}_{partitionId}_{vId}.ini`. Contains the
   configurations to init Velox backend and runtime session.
-- Plan file: JSON formatted, file name `plan_[stageId]_[partitionId].json`. 
Contains the substrait
+- Plan file: JSON formatted, file name 
`plan_{stageId}_{partitionId}_{vId}.json`. Contains the substrait
   plan to the stage, without input file splits.
-- Split file: JSON formatted, file name 
`split_[stageId]_[partitionId]_[splitIndex].json`. There can
+- Split file: JSON formatted, file name 
`split_{stageId}_{partitionId}_{vId}_{splitIdx}.json`. There can
   be more than one split file in a first stage task. Contains the substrait 
plan piece to the input
   file splits.
 - Data file(optional): Parquet formatted, file
-  name `data_[stageId]_[partitionId]_[iteratorIndex].parquet`. If the first 
stage contains one or
+  name `data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet`. If the 
first stage contains one or
   more BHJ operators, there can be one or more input data files. The input 
data files of a first
   stage will be loaded as iterators to serve as the inputs for the pipeline:
 
@@ -158,29 +160,29 @@ Sample command:
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---conf /absolute_path/to/conf_[stageId]_[partitionId].ini \
---plan /absolute_path/to/plan_[stageId]_[partitionId].json \
---split 
/absolut_path/to/split_[stageId]_[partitionId]_0.json,/absolut_path/to/split_[stageId]_[partitionId]_1.json
 \
+--conf /absolute_path/to/conf_{stageId}_{partitionId}_{vId}.ini \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--split 
/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json,/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json
 \
 --threads 1 --noprint-result
 
 # If the stage requires data files, use --data-file to specify the absolute 
path.
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---conf /absolute_path/to/conf_[stageId]_[partitionId].ini \
---plan /absolute_path/to/plan_[stageId]_[partitionId].json \
---split 
/absolut_path/to/split_[stageId]_[partitionId]_0.json,/absolut_path/to/split_[stageId]_[partitionId]_1.json
 \
---data 
/absolut_path/to/data_[stageId]_[partitionId]_0.parquet,/absolut_path/to/data_[stageId]_[partitionId]_1.parquet
 \
+--conf /absolute_path/to/conf_{stageId}_{partitionId}_{vId}.ini \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--split 
/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json,/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json
 \
+--data 
/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet,/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
 \
 --threads 1 --noprint-result
 ```
 
 If the simulated stage is a middle stage, which means pure shuffle stage, you 
will get 3 types of
 dumped file:
 
-- Configuration file: INI formatted, file name 
`conf_[stageId]_[partitionId].ini`. Contains the
+- Configuration file: INI formatted, file name 
`conf_{stageId}_{partitionId}_{vId}.ini`. Contains the
   configurations to init Velox backend and runtime session.
-- Plan file: JSON formatted, file name `plan_[stageId]_[partitionId].json`. 
Contains the substrait
+- Plan file: JSON formatted, file name 
`plan_{stageId}_{partitionId}_{vId}.json`. Contains the substrait
   plan to the stage.
-- Data file: Parquet formatted, file name 
`data_[stageId]_[partitionId]_[iteratorIndex].parquet`.
+- Data file: Parquet formatted, file name 
`data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet`.
   There can be more than one input data file in a middle stage task. The input 
data files of a
   middle stage will be loaded as iterators to serve as the inputs for the 
pipeline:
 
@@ -199,22 +201,22 @@ Sample command:
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---conf /absolute_path/to/conf_[stageId]_[partitionId].ini \
---plan /absolute_path/to/plan_[stageId]_[partitionId].json \
---data 
/absolut_path/to/data_[stageId]_[partitionId]_0.parquet,/absolut_path/to/data_[stageId]_[partitionId]_1.parquet
 \
+--conf /absolute_path/to/conf_{stageId}_{partitionId}_{vId}.ini \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--data 
/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet,/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
 \
 --threads 1 --noprint-result
 ```
 
 For some complex queries, stageId may cannot represent the Substrait plan 
input, please get the
 taskId from spark UI, and get your target parquet from saveDir.
 
-In this example, only one partition input with partition id 2, taskId is 36, 
iterator length is 2.
+In this example, only one partition input with stage id 3 partition id 2, 
partitionId is 36, iterator length is 2.
 
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---plan /absolute_path/to/complex_plan.json \
---data 
/absolute_path/to/data_36_2_0.parquet,/absolute_path/to/data_36_2_1.parquet \
+--plan /absolute_path/to/plan_3_36_0.json \
+--data 
/absolute_path/to/data_3_36_0_0.parquet,/absolute_path/to/data_3_36_0_1.parquet 
\
 --threads 1 --noprint-result
 ```
 
@@ -229,8 +231,8 @@ details.
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---plan /absolute_path/to/plan.json \
---data /absolute_path/to/data.parquet
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--data 
/absolute_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
 --threads 1 --noprint-result --save-output /absolute_path/to/result.parquet
 ```
 
@@ -245,8 +247,8 @@ details.
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---plan /absolute_path/to/plan.json \
---split /absolute_path/to/split.json \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--split /absolute_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json \
 --threads 1 --noprint-result --with-shuffle
 ```
 
@@ -311,9 +313,9 @@ General configurations for shuffle write:
 ```shell
 cd /path/to/gluten/cpp/build/velox/benchmarks
 ./generic_benchmark \
---plan /path/to/saveDir/plan_1_0.json \
---conf /path/to/saveDir/conf_1_0.ini \
---split /path/to/saveDir/split_1_0_0.json \
+--plan /path/to/saveDir/plan_{stageId}_{partitionId}_{vId}.json \
+--conf /path/to/saveDir/conf_{stageId}_{partitionId}_{vId}.ini \
+--split /path/to/saveDir/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json \
 --with-shuffle \
 --shuffle-writer sort \
 --partitioning hash \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to