This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b173657a57 [VL] Fix dump benchmark data issue in same task (#9360)
b173657a57 is described below
commit b173657a57ef8b2b93e43f0266f7563d770c72d4
Author: Yang Zhang <[email protected]>
AuthorDate: Mon Apr 28 22:13:37 2025 +0800
[VL] Fix dump benchmark data issue in same task (#9360)
---
.github/workflows/velox_backend.yml | 6 +-
cpp/core/compute/ProtobufUtils.cc | 14 +----
cpp/core/compute/ProtobufUtils.h | 6 +-
cpp/core/compute/Runtime.h | 17 +++---
cpp/core/jni/JniWrapper.cc | 41 ++-----------
cpp/velox/benchmarks/GenericBenchmark.cc | 7 ++-
cpp/velox/compute/VeloxRuntime.cc | 88 +++++++++++++++++++--------
cpp/velox/compute/VeloxRuntime.h | 14 +++--
cpp/velox/compute/WholeStageResultIterator.cc | 6 +-
cpp/velox/jni/VeloxJniWrapper.cc | 2 +-
cpp/velox/tests/RuntimeTest.cc | 8 +--
docs/developers/MicroBenchmarks.md | 76 ++++++++++++-----------
12 files changed, 144 insertions(+), 141 deletions(-)
diff --git a/.github/workflows/velox_backend.yml
b/.github/workflows/velox_backend.yml
index 6c0538aaf8..cb4aeaa845 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -1263,9 +1263,9 @@ jobs:
-DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none
-DfailIfNoTests=false -Dexec.skip
# This test depends on files generated by the above mvn test.
./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle
--partitioning hash --threads 1 --iterations 1 \
- --conf $(realpath
backends-velox/generated-native-benchmark/conf_12_0.ini) \
- --plan $(realpath
backends-velox/generated-native-benchmark/plan_12_0.json) \
- --data $(realpath
backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath
backends-velox/generated-native-benchmark/data_12_0_1.parquet)
+ --conf $(realpath
backends-velox/generated-native-benchmark/conf_12_0_*.ini) \
+ --plan $(realpath
backends-velox/generated-native-benchmark/plan_12_0_*.json) \
+ --data $(realpath
backends-velox/generated-native-benchmark/data_12_0_*_0.parquet),$(realpath
backends-velox/generated-native-benchmark/data_12_0_*_1.parquet)
- name: Run UDF test
run: |
# Depends on --build_example=ON.
diff --git a/cpp/core/compute/ProtobufUtils.cc
b/cpp/core/compute/ProtobufUtils.cc
index c41f062ac5..22967efbc1 100644
--- a/cpp/core/compute/ProtobufUtils.cc
+++ b/cpp/core/compute/ProtobufUtils.cc
@@ -63,11 +63,7 @@ std::string substraitFromJsonToPb(std::string_view typeName,
std::string_view js
return out;
}
-std::string substraitFromPbToJson(
- std::string_view typeName,
- const uint8_t* data,
- int32_t size,
- std::optional<std::string> dumpFile) {
+std::string substraitFromPbToJson(std::string_view typeName, const uint8_t*
data, int32_t size) {
std::string typeUrl = "/substrait." + std::string(typeName);
google::protobuf::io::ArrayInputStream bufStream{data, size};
@@ -79,14 +75,6 @@ std::string substraitFromPbToJson(
if (!status.ok()) {
throw GlutenException("BinaryToJsonStream returned " + status.ToString());
}
-
- if (dumpFile.has_value()) {
- std::ofstream outFile(*dumpFile);
- if (!outFile.is_open()) {
- LOG(ERROR) << "Failed to open file for writing: " << *dumpFile;
- }
- outFile << out << std::endl;
- }
return out;
}
diff --git a/cpp/core/compute/ProtobufUtils.h b/cpp/core/compute/ProtobufUtils.h
index 849f88f039..a4207dd786 100644
--- a/cpp/core/compute/ProtobufUtils.h
+++ b/cpp/core/compute/ProtobufUtils.h
@@ -27,10 +27,6 @@ bool parseProtobuf(const uint8_t* buf, int bufLen,
google::protobuf::Message* ms
std::string substraitFromJsonToPb(std::string_view typeName, std::string_view
json);
-std::string substraitFromPbToJson(
- std::string_view typeName,
- const uint8_t* data,
- int32_t size,
- std::optional<std::string> dumpFile);
+std::string substraitFromPbToJson(std::string_view typeName, const uint8_t*
data, int32_t size);
} // namespace gluten
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 16acada54f..ddb233541f 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -42,13 +42,16 @@ struct SparkTaskInfo {
int32_t partitionId{0};
// Same as TID.
int64_t taskId{0};
+ // virtual id for each backend internal use
+ int32_t vId{0};
std::string toString() const {
- return "[Stage: " + std::to_string(stageId) + " TID: " +
std::to_string(taskId) + "]";
+ return "[Stage: " + std::to_string(stageId) + " TID: " +
std::to_string(taskId) + " VID: " + std::to_string(vId) +
+ "]";
}
friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo&
taskInfo) {
- os << "[Stage: " << taskInfo.stageId << " TID: " << taskInfo.taskId << "]";
+ os << taskInfo.toString();
return os;
}
};
@@ -80,11 +83,11 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
return kind_;
}
- virtual void parsePlan(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
+ virtual void parsePlan(const uint8_t* data, int32_t size, bool dumpPlan) {
throw GlutenException("Not implemented");
}
- virtual void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
+ virtual void parseSplitInfo(const uint8_t* data, int32_t size, int32_t idx,
bool dumpSplit) {
throw GlutenException("Not implemented");
}
@@ -147,11 +150,11 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
throw GlutenException("Not implemented");
}
- virtual void dumpConf(const std::string& path) {
+ virtual void dumpConf(bool dump) {
throw GlutenException("Not implemented");
}
- virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string&
path) {
+ virtual std::shared_ptr<ArrowWriter> createArrowWriter(bool dumpData,
int32_t idx) {
throw GlutenException("Not implemented");
};
@@ -159,7 +162,7 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
return confMap_;
}
- void setSparkTaskInfo(SparkTaskInfo taskInfo) {
+ virtual void setSparkTaskInfo(SparkTaskInfo taskInfo) {
taskInfo_ = taskInfo;
}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 965bb2a7c4..ecc816a79b 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -375,7 +375,7 @@ JNIEXPORT jstring JNICALL
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap
auto planData = safeArray.elems();
auto planSize = env->GetArrayLength(planArray);
auto ctx = getRuntime(env, wrapper);
- ctx->parsePlan(planData, planSize, std::nullopt);
+ ctx->parsePlan(planData, planSize, false);
auto& conf = ctx->getConfMap();
auto planString = ctx->planString(details, conf);
return env->NewStringUTF(planString.c_str());
@@ -414,58 +414,27 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
ctx->setSparkTaskInfo({stageId, partitionId, taskId});
- std::string saveDir{};
- std::string fileIdentifier = "_" + std::to_string(stageId) + "_" +
std::to_string(partitionId);
- if (saveInput) {
- if (conf.find(kGlutenSaveDir) == conf.end()) {
- throw GlutenException(kGlutenSaveDir + " is not configured.");
- }
- saveDir = conf.at(kGlutenSaveDir);
- std::filesystem::path f{saveDir};
- if (std::filesystem::exists(f)) {
- if (!std::filesystem::is_directory(f)) {
- throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " +
saveDir);
- }
- } else {
- std::error_code ec;
- std::filesystem::create_directory(f, ec);
- if (ec) {
- throw GlutenException("Failed to create directory: " + saveDir + ",
error message: " + ec.message());
- }
- }
- ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini");
- }
+ ctx->dumpConf(saveInput);
auto spillDirStr = jStringToCString(env, spillDir);
auto safePlanArray = getByteArrayElementsSafe(env, planArr);
auto planSize = env->GetArrayLength(planArr);
- ctx->parsePlan(
- safePlanArray.elems(),
- planSize,
- saveInput ? std::optional<std::string>(saveDir + "/plan" +
fileIdentifier + ".json") : std::nullopt);
+ ctx->parsePlan(safePlanArray.elems(), planSize, saveInput);
for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i
< splitInfoArraySize; i++) {
jbyteArray splitInfoArray =
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
auto splitInfoData = safeSplitArray.elems();
- ctx->parseSplitInfo(
- splitInfoData,
- splitInfoSize,
- saveInput ? std::optional<std::string>(saveDir + "/split" +
fileIdentifier + "_" + std::to_string(i) + ".json")
- : std::nullopt);
+ ctx->parseSplitInfo(splitInfoData, splitInfoSize, i, saveInput);
}
// Handle the Java iters
jsize itersLen = env->GetArrayLength(iterArr);
std::vector<std::shared_ptr<ResultIterator>> inputIters;
for (int idx = 0; idx < itersLen; idx++) {
- std::shared_ptr<ArrowWriter> writer = nullptr;
- if (saveInput) {
- auto file = saveDir + "/data" + fileIdentifier + "_" +
std::to_string(idx) + ".parquet";
- writer = ctx->createArrowWriter(file);
- }
+ auto writer = ctx->createArrowWriter(saveInput, idx);
jobject iter = env->GetObjectArrayElement(iterArr, idx);
auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer);
auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 1a48ca815d..b73840fbaf 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -437,9 +437,10 @@ auto BM_Generic = [](::benchmark::State& state,
});
}
*Runtime::localWriteFilesTempPath() = FLAGS_write_path;
- runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(),
std::nullopt);
- for (auto& split : splits) {
- runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()),
split.size(), std::nullopt);
+ runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(),
false);
+ for (auto i = 0; i < splits.size(); i++) {
+ auto split = splits[i];
+ runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()),
split.size(), i, false);
}
auto resultIter = runtime->createResultIterator(veloxSpillDir,
std::move(inputIters), runtime->getConfMap());
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index e2309886a2..a328e9b22d 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -57,6 +57,34 @@
DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);
using namespace facebook;
namespace gluten {
+namespace {
+void dumpToStorage(
+ const std::shared_ptr<facebook::velox::config::ConfigBase>& conf,
+ const std::string& dumpFile,
+ const std::string content) {
+ auto saveDir = conf->get<std::string>(kGlutenSaveDir).value();
+ std::filesystem::path f{saveDir};
+ if (std::filesystem::exists(f)) {
+ if (!std::filesystem::is_directory(f)) {
+ throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " +
saveDir);
+ }
+ } else {
+ std::error_code ec;
+ std::filesystem::create_directory(f, ec);
+ if (ec) {
+ throw GlutenException("Failed to create directory: " + saveDir + ",
error message: " + ec.message());
+ }
+ }
+ std::string dumpPath = f / dumpFile;
+ std::ofstream outFile(dumpPath);
+ if (!outFile.is_open()) {
+ LOG(ERROR) << "Failed to open file for writing: " << dumpPath;
+ return;
+ }
+ outFile << content;
+ outFile.close();
+}
+} // namespace
VeloxRuntime::VeloxRuntime(
const std::string& kind,
@@ -78,10 +106,14 @@ VeloxRuntime::VeloxRuntime(
kMemoryPoolCapacityTransferAcrossTasks,
FLAGS_velox_memory_pool_capacity_transfer_across_tasks);
}
-void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
- if (debugModeEnabled_ || dumpFile.has_value()) {
+void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, bool dumpPlan)
{
+ if (debugModeEnabled_ || dumpPlan) {
try {
- auto planJson = substraitFromPbToJson("Plan", data, size, dumpFile);
+ auto planJson = substraitFromPbToJson("Plan", data, size);
+ if (dumpPlan) {
+ auto dumpFile = fmt::format("plan_{}_{}_{}.json", taskInfo_.stageId,
taskInfo_.partitionId, taskInfo_.vId);
+ dumpToStorage(veloxCfg_, dumpFile, planJson);
+ }
LOG_IF(INFO, debugModeEnabled_) << std::string(50, '#') << " received
substrait::Plan: " << taskInfo_ << std::endl
<< planJson;
} catch (const std::exception& e) {
@@ -92,10 +124,15 @@ void VeloxRuntime::parsePlan(const uint8_t* data, int32_t
size, std::optional<st
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse
substrait plan failed");
}
-void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
- if (debugModeEnabled_ || dumpFile.has_value()) {
+void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size, int32_t
idx, bool dumpSplit) {
+ if (debugModeEnabled_ || dumpSplit) {
try {
- auto splitJson = substraitFromPbToJson("ReadRel.LocalFiles", data, size,
dumpFile);
+ auto splitJson = substraitFromPbToJson("ReadRel.LocalFiles", data, size);
+ if (dumpSplit) {
+ auto dumpFile =
+ fmt::format("split_{}_{}_{}_{}.json", taskInfo_.stageId,
taskInfo_.partitionId, taskInfo_.vId, idx);
+ dumpToStorage(veloxCfg_, dumpFile, splitJson);
+ }
LOG_IF(INFO, debugModeEnabled_) << std::string(50, '#')
<< " received
substrait::ReadRel.LocalFiles: " << taskInfo_ << std::endl
<< splitJson;
@@ -283,7 +320,11 @@ std::unique_ptr<ColumnarBatchSerializer>
VeloxRuntime::createColumnarBatchSerial
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool,
cSchema);
}
-void VeloxRuntime::dumpConf(const std::string& path) {
+void VeloxRuntime::dumpConf(bool dump) {
+ if (!dump) {
+ return;
+ }
+
const auto& backendConfMap =
VeloxBackend::get()->getBackendConf()->rawConfigs();
auto allConfMap = backendConfMap;
@@ -291,13 +332,7 @@ void VeloxRuntime::dumpConf(const std::string& path) {
allConfMap.insert_or_assign(pair.first, pair.second);
}
- // Open file "velox.conf" for writing, automatically creating it if it
doesn't exist,
- // or overwriting it if it does.
- std::ofstream outFile(path);
- if (!outFile.is_open()) {
- LOG(ERROR) << "Failed to open file for writing: " << path;
- return;
- }
+ std::stringstream out;
// Calculate the maximum key length for alignment.
size_t maxKeyLength = 0;
@@ -306,24 +341,29 @@ void VeloxRuntime::dumpConf(const std::string& path) {
}
// Write each key-value pair to the file with adjusted spacing for alignment
- outFile << "[Backend Conf]" << std::endl;
+ out << "[Backend Conf]" << std::endl;
for (const auto& pair : backendConfMap) {
- outFile << std::left << std::setw(maxKeyLength + 1) << pair.first << ' '
<< pair.second << std::endl;
+ out << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' <<
pair.second << std::endl;
}
- outFile << std::endl << "[Session Conf]" << std::endl;
+ out << std::endl << "[Session Conf]" << std::endl;
for (const auto& pair : confMap_) {
- outFile << std::left << std::setw(maxKeyLength + 1) << pair.first << ' '
<< pair.second << std::endl;
+ out << std::left << std::setw(maxKeyLength + 1) << pair.first << ' ' <<
pair.second << std::endl;
}
- outFile.close();
+ auto dumpPath = fmt::format("conf_{}_{}_{}.ini", taskInfo_.stageId,
taskInfo_.partitionId, taskInfo_.vId);
+ dumpToStorage(veloxCfg_, dumpPath, out.str());
}
-std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(const
std::string& path) {
- int64_t batchSize = 4096;
- if (auto it = confMap_.find(kSparkBatchSize); it != confMap_.end()) {
- batchSize = std::atol(it->second.c_str());
+std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(bool dumpData,
int32_t idx) {
+ if (!dumpData) {
+ return nullptr;
}
- return std::make_shared<VeloxArrowWriter>(path, batchSize,
memoryManager()->getLeafMemoryPool().get());
+
+ auto saveDir = veloxCfg_->get<std::string>(kGlutenSaveDir).value();
+ auto dumpPath =
+ fmt::format("{}/data_{}_{}_{}_{}.parquet", saveDir, taskInfo_.stageId,
taskInfo_.partitionId, taskInfo_.vId, idx);
+ auto batchSize = veloxCfg_->get<int64_t>(kSparkBatchSize, 4096);
+ return std::make_shared<VeloxArrowWriter>(dumpPath, batchSize,
memoryManager()->getLeafMemoryPool().get());
}
} // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 798fa5bc72..163e09b2ea 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -35,9 +35,15 @@ class VeloxRuntime final : public Runtime {
VeloxMemoryManager* vmm,
const std::unordered_map<std::string, std::string>& confMap);
- void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string>
dumpFile) override;
+ void setSparkTaskInfo(SparkTaskInfo taskInfo) override {
+ static std::atomic<uint32_t> vtId{0};
+ taskInfo_ = taskInfo;
+ taskInfo_.vId = vtId++;
+ }
+
+ void parsePlan(const uint8_t* data, int32_t size, bool dumpPlan) override;
- void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) override;
+ void parseSplitInfo(const uint8_t* data, int32_t size, int32_t idx, bool
dumpSplitInfo) override;
VeloxMemoryManager* memoryManager() override;
@@ -74,9 +80,9 @@ class VeloxRuntime final : public Runtime {
std::string planString(bool details, const std::unordered_map<std::string,
std::string>& sessionConf) override;
- void dumpConf(const std::string& path) override;
+ void dumpConf(bool dump) override;
- std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path)
override;
+ std::shared_ptr<ArrowWriter> createArrowWriter(bool dumpData, int32_t idx)
override;
std::shared_ptr<VeloxDataSource> createDataSource(const std::string&
filePath, std::shared_ptr<arrow::Schema> schema);
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 666610d907..79d7669737 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -82,13 +82,12 @@ WholeStageResultIterator::WholeStageResultIterator(
std::unordered_set<velox::core::PlanNodeId> emptySet;
velox::core::PlanFragment planFragment{planNode,
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
- static std::atomic<uint32_t> vtId{0}; // Velox task ID to distinguish from
Spark task ID.
task_ = velox::exec::Task::create(
fmt::format(
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo_.stageId),
std::to_string(taskInfo_.taskId),
- std::to_string(vtId++)),
+ std::to_string(taskInfo.vId)),
std::move(planFragment),
0,
std::move(queryCtx),
@@ -179,7 +178,6 @@ WholeStageResultIterator::WholeStageResultIterator(
std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQueryCtx() {
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
connectorConfigs[kHiveConnectorId] = createConnectorConfig();
- static std::atomic<uint32_t> vqId{0}; // Velox query ID, same with taskId.
std::shared_ptr<velox::core::QueryCtx> ctx = velox::core::QueryCtx::create(
nullptr,
facebook::velox::core::QueryConfig{getQueryContextConf()},
@@ -191,7 +189,7 @@ std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQ
"Gluten_Stage_{}_TID_{}_VTID_{}",
std::to_string(taskInfo_.stageId),
std::to_string(taskInfo_.taskId),
- std::to_string(vqId++)));
+ std::to_string(taskInfo_.vId)));
return ctx;
}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 25e946b68b..c5f1d2ded9 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -138,7 +138,7 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
const auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
if (runtime->debugModeEnabled()) {
try {
- const auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize,
std::nullopt);
+ const auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize);
LOG(INFO) << std::string(50, '#') << " received substrait::Plan: for
validation";
LOG(INFO) << jsonPlan;
} catch (const std::exception& e) {
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 212b0201ed..e15d3bdb27 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -50,9 +50,9 @@ class DummyRuntime final : public Runtime {
const std::unordered_map<std::string, std::string>& conf)
: Runtime(kind, mm, conf) {}
- void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string>
dumpFile) override {}
+ void parsePlan(const uint8_t* data, int32_t size, bool dump) override {}
- void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) override {}
+ void parseSplitInfo(const uint8_t* data, int32_t size, int32_t idx, bool
dump) override {}
std::shared_ptr<ResultIterator> createResultIterator(
const std::string& spillDir,
@@ -99,11 +99,11 @@ class DummyRuntime final : public Runtime {
throw GlutenException("Not yet implemented");
}
- void dumpConf(const std::string& path) override {
+ void dumpConf(bool dump) override {
throw GlutenException("Not yet implemented");
}
- std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path)
override {
+ std::shared_ptr<ArrowWriter> createArrowWriter(bool dump, int32_t) override {
throw GlutenException("Not yet implemented");
}
diff --git a/docs/developers/MicroBenchmarks.md
b/docs/developers/MicroBenchmarks.md
index ccb01d4737..2121d2a1de 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -43,16 +43,17 @@ mvn test -Pspark-3.2 -Pbackends-velox -pl backends-velox
-am \
The generated example files are placed in gluten/backends-velox:
+- stageId means Spark stage id in web ui.
+- partitionedId means Spark partition id in web stage ui.
+- vId means backends internal virtual id.
+
```shell
$ tree gluten/backends-velox/generated-native-benchmark/
gluten/backends-velox/generated-native-benchmark/
-├── example.json
-├── example_lineitem
-│ ├── part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet
-│ └── _SUCCESS
-└── example_orders
- ├── part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet
- └── _SUCCESS
+├── conf_12_10_3.ini
+├── data_12_10_3_0.parquet
+├── data_12_10_3_1.parquet
+├── plan_12_10_3.json
```
Run micro benchmark with the generated files as input. You need to specify the
**absolute** path to
@@ -61,9 +62,10 @@ the input files:
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---plan
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example.json
\
---data
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_orders/part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet,\
-/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_lineitem/part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet
\
+--plan
<path-to-gluten>/backends-velox/generated-native-benchmark/plan_{stageId}_{partitionId}_{vId}.json
\
+--data
<path-to-gluten>/backends-velox/generated-native-benchmark/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet,\
+<path-to-gluten>/backends-velox/generated-native-benchmark/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
\
+--conf
<path-to-gluten>/backends-velox/generated-native-benchmark/conf_{stageId}_{partitionId}_{vId}.ini
\
--threads 1 --iterations 1 --noprint-result
```
@@ -128,15 +130,15 @@ And then re-run the query with below configurations to
dump the inputs to micro
Check the files in `spark.gluten.saveDir`. If the simulated stage is a first
stage, you will get 3
or 4 types of dumped file:
-- Configuration file: INI formatted, file name
`conf_[stageId]_[partitionId].ini`. Contains the
+- Configuration file: INI formatted, file name
`conf_{stageId}_{partitionId}_{vId}.ini`. Contains the
configurations to init Velox backend and runtime session.
-- Plan file: JSON formatted, file name `plan_[stageId]_[partitionId].json`.
Contains the substrait
+- Plan file: JSON formatted, file name
`plan_{stageId}_{partitionId}_{vId}.json`. Contains the substrait
plan to the stage, without input file splits.
-- Split file: JSON formatted, file name
`split_[stageId]_[partitionId]_[splitIndex].json`. There can
+- Split file: JSON formatted, file name
`split_{stageId}_{partitionId}_{vId}_{splitIdx}.json`. There can
be more than one split file in a first stage task. Contains the substrait
plan piece to the input
file splits.
- Data file(optional): Parquet formatted, file
- name `data_[stageId]_[partitionId]_[iteratorIndex].parquet`. If the first
stage contains one or
+ name `data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet`. If the
first stage contains one or
more BHJ operators, there can be one or more input data files. The input
data files of a first
stage will be loaded as iterators to serve as the inputs for the pipeline:
@@ -158,29 +160,29 @@ Sample command:
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---conf /absolute_path/to/conf_[stageId]_[partitionId].ini \
---plan /absolute_path/to/plan_[stageId]_[partitionId].json \
---split
/absolut_path/to/split_[stageId]_[partitionId]_0.json,/absolut_path/to/split_[stageId]_[partitionId]_1.json
\
+--conf /absolute_path/to/conf_{stageId}_{partitionId}_{vId}.ini \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--split
/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json,/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json
\
--threads 1 --noprint-result
# If the stage requires data files, use --data-file to specify the absolute
path.
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---conf /absolute_path/to/conf_[stageId]_[partitionId].ini \
---plan /absolute_path/to/plan_[stageId]_[partitionId].json \
---split
/absolut_path/to/split_[stageId]_[partitionId]_0.json,/absolut_path/to/split_[stageId]_[partitionId]_1.json
\
---data
/absolut_path/to/data_[stageId]_[partitionId]_0.parquet,/absolut_path/to/data_[stageId]_[partitionId]_1.parquet
\
+--conf /absolute_path/to/conf_{stageId}_{partitionId}_{vId}.ini \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--split
/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json,/absolut_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json
\
+--data
/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet,/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
\
--threads 1 --noprint-result
```
If the simulated stage is a middle stage, which means pure shuffle stage, you
will get 3 types of
dumped file:
-- Configuration file: INI formatted, file name
`conf_[stageId]_[partitionId].ini`. Contains the
+- Configuration file: INI formatted, file name
`conf_{stageId}_{partitionId}_{vId}.ini`. Contains the
configurations to init Velox backend and runtime session.
-- Plan file: JSON formatted, file name `plan_[stageId]_[partitionId].json`.
Contains the substrait
+- Plan file: JSON formatted, file name
`plan_{stageId}_{partitionId}_{vId}.json`. Contains the substrait
plan to the stage.
-- Data file: Parquet formatted, file name
`data_[stageId]_[partitionId]_[iteratorIndex].parquet`.
+- Data file: Parquet formatted, file name
`data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet`.
There can be more than one input data file in a middle stage task. The input
data files of a
middle stage will be loaded as iterators to serve as the inputs for the
pipeline:
@@ -199,22 +201,22 @@ Sample command:
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---conf /absolute_path/to/conf_[stageId]_[partitionId].ini \
---plan /absolute_path/to/plan_[stageId]_[partitionId].json \
---data
/absolut_path/to/data_[stageId]_[partitionId]_0.parquet,/absolut_path/to/data_[stageId]_[partitionId]_1.parquet
\
+--conf /absolute_path/to/conf_{stageId}_{partitionId}_{vId}.ini \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--data
/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet,/absolut_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
\
--threads 1 --noprint-result
```
For some complex queries, stageId may cannot represent the Substrait plan
input, please get the
taskId from spark UI, and get your target parquet from saveDir.
-In this example, only one partition input with partition id 2, taskId is 36,
iterator length is 2.
+In this example, only one partition input with stage id 3 partition id 2,
partitionId is 36, iterator length is 2.
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---plan /absolute_path/to/complex_plan.json \
---data
/absolute_path/to/data_36_2_0.parquet,/absolute_path/to/data_36_2_1.parquet \
+--plan /absolute_path/to/plan_3_36_0.json \
+--data
/absolute_path/to/data_3_36_0_0.parquet,/absolute_path/to/data_3_36_0_1.parquet
\
--threads 1 --noprint-result
```
@@ -229,8 +231,8 @@ details.
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---plan /absolute_path/to/plan.json \
---data /absolute_path/to/data.parquet
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--data
/absolute_path/to/data_{stageId}_{partitionId}_{vId}_{iteratorIdx}.parquet
--threads 1 --noprint-result --save-output /absolute_path/to/result.parquet
```
@@ -245,8 +247,8 @@ details.
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---plan /absolute_path/to/plan.json \
---split /absolute_path/to/split.json \
+--plan /absolute_path/to/plan_{stageId}_{partitionId}_{vId}.json \
+--split /absolute_path/to/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json \
--threads 1 --noprint-result --with-shuffle
```
@@ -311,9 +313,9 @@ General configurations for shuffle write:
```shell
cd /path/to/gluten/cpp/build/velox/benchmarks
./generic_benchmark \
---plan /path/to/saveDir/plan_1_0.json \
---conf /path/to/saveDir/conf_1_0.ini \
---split /path/to/saveDir/split_1_0_0.json \
+--plan /path/to/saveDir/plan_{stageId}_{partitionId}_{vId}.json \
+--conf /path/to/saveDir/conf_{stageId}_{partitionId}_{vId}.ini \
+--split /path/to/saveDir/split_{stageId}_{partitionId}_{vId}_{splitIdx}.json \
--with-shuffle \
--shuffle-writer sort \
--partitioning hash \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]