This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8af7b3ccb [VL] Fix wrong task info when log split info (#5167)
8af7b3ccb is described below
commit 8af7b3ccb3984b80de3f6ee0168bd9d2bd99a025
Author: Yang Zhang <[email protected]>
AuthorDate: Fri Mar 29 12:03:27 2024 +0800
[VL] Fix wrong task info when log split info (#5167)
---
cpp/core/compute/Runtime.h | 13 +++++--------
cpp/core/jni/JniWrapper.cc | 23 ++++++++++++-----------
cpp/velox/benchmarks/GenericBenchmark.cc | 2 +-
cpp/velox/compute/VeloxRuntime.cc | 7 +------
cpp/velox/compute/VeloxRuntime.h | 3 +--
cpp/velox/tests/RuntimeTest.cc | 3 +--
6 files changed, 21 insertions(+), 30 deletions(-)
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index f8a3a5a24..7574b0219 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -64,17 +64,14 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
Runtime(const std::unordered_map<std::string, std::string>& confMap) :
confMap_(confMap) {}
virtual ~Runtime() = default;
- /// Parse and cache the plan.
- /// Return true if parsed successfully.
- virtual void
- parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo,
std::optional<std::string> dumpFile) = 0;
+ virtual void parsePlan(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) = 0;
+
+ virtual void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) = 0;
virtual std::string planString(bool details, const
std::unordered_map<std::string, std::string>& sessionConf) = 0;
virtual void injectWriteFilesTempPath(const std::string& path) = 0;
- virtual void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) = 0;
-
// Just for benchmark
::substrait::Plan& getPlan() {
return substraitPlan_;
@@ -134,8 +131,8 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
return confMap_;
}
- SparkTaskInfo getSparkTaskInfo() {
- return taskInfo_;
+ void setSparkTaskInfo(SparkTaskInfo taskInfo) {
+ taskInfo_ = taskInfo;
}
ObjectStore* objectStore() {
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 17a07aa09..39080d0ca 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -341,7 +341,7 @@ JNIEXPORT jstring JNICALL
Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapp
auto planData = safeArray.elems();
auto planSize = env->GetArrayLength(planArray);
auto ctx = gluten::getRuntime(env, wrapper);
- ctx->parsePlan(planData, planSize, {}, std::nullopt);
+ ctx->parsePlan(planData, planSize, std::nullopt);
auto& conf = ctx->getConfMap();
auto planString = ctx->planString(details, conf);
return env->NewStringUTF(planString.c_str());
@@ -382,6 +382,8 @@
Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI
auto ctx = gluten::getRuntime(env, wrapper);
auto& conf = ctx->getConfMap();
+ ctx->setSparkTaskInfo({stageId, partitionId, taskId});
+
std::string saveDir{};
std::string fileIdentifier = "_" + std::to_string(stageId) + "_" +
std::to_string(partitionId);
if (saveInput) {
@@ -400,11 +402,18 @@
Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI
auto spillDirStr = jStringToCString(env, spillDir);
+ auto safePlanArray = gluten::getByteArrayElementsSafe(env, planArr);
+ auto planSize = env->GetArrayLength(planArr);
+ ctx->parsePlan(
+ safePlanArray.elems(),
+ planSize,
+ saveInput ? std::optional<std::string>(saveDir + "/plan" +
fileIdentifier + ".json") : std::nullopt);
+
for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i
< splitInfoArraySize; i++) {
jbyteArray splitInfoArray =
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
- auto safeArray = gluten::getByteArrayElementsSafe(env, splitInfoArray);
- auto splitInfoData = safeArray.elems();
+ auto safeSplitArray = gluten::getByteArrayElementsSafe(env,
splitInfoArray);
+ auto splitInfoData = safeSplitArray.elems();
ctx->parseSplitInfo(
splitInfoData,
splitInfoSize,
@@ -412,14 +421,6 @@
Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI
: std::nullopt);
}
- auto safeArray = gluten::getByteArrayElementsSafe(env, planArr);
- auto planSize = env->GetArrayLength(planArr);
- ctx->parsePlan(
- safeArray.elems(),
- planSize,
- {stageId, partitionId, taskId},
- saveInput ? std::optional<std::string>(saveDir + "/plan" +
fileIdentifier + ".json") : std::nullopt);
-
// Handle the Java iters
jsize itersLen = env->GetArrayLength(iterArr);
std::vector<std::shared_ptr<ResultIterator>> inputIters;
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index a9084943d..f2e275911 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -174,7 +174,7 @@ auto BM_Generic = [](::benchmark::State& state,
});
}
runtime->injectWriteFilesTempPath(FLAGS_write_path);
- runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(),
{}, std::nullopt);
+ runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(),
std::nullopt);
for (auto& split : splits) {
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()),
split.size(), std::nullopt);
}
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index d1b1bbda0..46d9bc85f 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -45,12 +45,7 @@ VeloxRuntime::VeloxRuntime(const
std::unordered_map<std::string, std::string>& c
FLAGS_v = veloxCfg_->get<uint32_t>(kGlogVerboseLevel, FLAGS_v);
}
-void VeloxRuntime::parsePlan(
- const uint8_t* data,
- int32_t size,
- SparkTaskInfo taskInfo,
- std::optional<std::string> dumpFile) {
- taskInfo_ = taskInfo;
+void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
if (debugModeEnabled_ || dumpFile.has_value()) {
try {
auto planJson = substraitFromPbToJson("Plan", data, size, dumpFile);
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 0233b3f82..e2097edb1 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -35,8 +35,7 @@ class VeloxRuntime final : public Runtime {
public:
explicit VeloxRuntime(const std::unordered_map<std::string, std::string>&
confMap);
- void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo,
std::optional<std::string> dumpFile)
- override;
+ void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string>
dumpFile) override;
void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) override;
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 0c51f0bd0..377d76054 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -25,8 +25,7 @@ class DummyRuntime final : public Runtime {
public:
DummyRuntime(const std::unordered_map<std::string, std::string>& conf) :
Runtime(conf) {}
- void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo,
std::optional<std::string> dumpFile)
- override {}
+ void parsePlan(const uint8_t* data, int32_t size, std::optional<std::string>
dumpFile) override {}
void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) override {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]