This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e78ee4393 [VL] Pass partition id to velox functions (#4344)
e78ee4393 is described below
commit e78ee43938109c2589068b5cae0293d25848a4d7
Author: Zhen Li <[email protected]>
AuthorDate: Mon Mar 11 12:07:06 2024 +0800
[VL] Pass partition id to velox functions (#4344)
[VL] Pass partition id to velox functions.
---
.../io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala | 2 ++
.../io/glutenproject/backendsapi/velox/IteratorApiImpl.scala | 8 ++++++--
cpp/velox/benchmarks/PlanValidatorUtil.cc | 4 +++-
cpp/velox/compute/WholeStageResultIterator.cc | 2 ++
cpp/velox/jni/VeloxJniWrapper.cc | 3 ++-
.../src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala | 2 ++
.../io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala | 1 +
.../glutenproject/execution/WholeStageZippedPartitionsRDD.scala | 1 +
.../java/io/glutenproject/vectorized/NativePlanEvaluator.java | 4 ++--
9 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
index 5b6d326a6..197e80a17 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
@@ -178,6 +178,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
pipelineTime: SQLMetric,
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
+ partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch] = {
@@ -246,6 +247,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
rootNode: PlanNode,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
+ partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
GlutenConfig.getConf
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
index 180957f17..f2943d31d 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
@@ -146,6 +146,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateNativeMetrics: IMetrics => Unit,
+ partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()):
Iterator[ColumnarBatch] = {
assert(
inputPartition.isInstanceOf[GlutenPartition],
@@ -165,7 +166,8 @@ class IteratorApiImpl extends IteratorApi with Logging {
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
- columnarNativeIterators)
+ columnarNativeIterators,
+ partitionIndex)
pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
beforeBuild)
Iterators
@@ -193,6 +195,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
rootNode: PlanNode,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
+ partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {
ExecutorManager.tryTaskSet(numaBindingInfo)
@@ -207,7 +210,8 @@ class IteratorApiImpl extends IteratorApi with Logging {
rootNode.toProtobuf.toByteArray,
// Final iterator does not contain scan split, so pass empty split
info to native here.
new Array[Array[Byte]](0),
- columnarNativeIterator
+ columnarNativeIterator,
+ partitionIndex
)
Iterators
diff --git a/cpp/velox/benchmarks/PlanValidatorUtil.cc
b/cpp/velox/benchmarks/PlanValidatorUtil.cc
index 229804368..e299b4620 100644
--- a/cpp/velox/benchmarks/PlanValidatorUtil.cc
+++ b/cpp/velox/benchmarks/PlanValidatorUtil.cc
@@ -22,6 +22,7 @@
#include "memory/VeloxMemoryManager.h"
#include "substrait/SubstraitToVeloxPlanValidator.h"
+using namespace facebook::velox;
using namespace gluten;
/// Set spark.gluten.sql.debug=true to get validation plan and dump it into a
json file,
@@ -43,7 +44,8 @@ int main(int argc, char** argv) {
std::unordered_map<std::string, std::string> conf;
conf.insert({kDebugModeEnabled, "true"});
initVeloxBackend(conf);
- core::QueryCtx queryCtx;
+ std::unordered_map<std::string, std::string>
configs{{core::QueryConfig::kSparkPartitionId, "0"}};
+ core::QueryCtx queryCtx(nullptr, core::QueryConfig(configs));
auto pool = defaultLeafVeloxMemoryPool().get();
core::ExecCtx execCtx(pool, &queryCtx);
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index b74b8504d..1b217ebeb 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -509,6 +509,8 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
// Disable driver cpu time slicing.
configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0";
+ configs[velox::core::QueryConfig::kSparkPartitionId] =
std::to_string(taskInfo_.partitionId);
+
} catch (const std::invalid_argument& err) {
std::string errDetails = err.what();
throw std::runtime_error("Invalid conf arg: " + errDetails);
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 32f1bd0c8..063605001 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -110,7 +110,8 @@
Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFailu
gluten::parseProtobuf(planData, planSize, &subPlan);
// A query context used for function validation.
- velox::core::QueryCtx queryCtx;
+ std::unordered_map<std::string, std::string>
configs{{velox::core::QueryConfig::kSparkPartitionId, "0"}};
+ velox::core::QueryCtx queryCtx(nullptr, velox::core::QueryConfig(configs));
auto pool = gluten::defaultLeafVeloxMemoryPool().get();
// An execution context used for function validation.
velox::core::ExecCtx execCtx(pool, &queryCtx);
diff --git
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
index b68080cd9..2f506f483 100644
--- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
@@ -60,6 +60,7 @@ trait IteratorApi {
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateNativeMetrics: IMetrics => Unit,
+ partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch]
@@ -76,6 +77,7 @@ trait IteratorApi {
rootNode: PlanNode,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
+ partitionIndex: Int,
materializeInput: Boolean = false): Iterator[ColumnarBatch]
// scalastyle:on argcount
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
index 0aa4eec7f..36eb35203 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
@@ -95,6 +95,7 @@ class GlutenWholeStageColumnarRDD(
pipelineTime,
updateInputMetrics,
updateNativeMetrics,
+ split.index,
inputIterators
)
}
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
index 6c48159f3..9a11326c5 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
@@ -55,6 +55,7 @@ class WholeStageZippedPartitionsRDD(
resCtx.root,
pipelineTime,
updateNativeMetrics,
+ split.index,
materializeInput
)
}
diff --git
a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
index d81e7abee..7dc6884aa 100644
---
a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
+++
b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
@@ -61,7 +61,7 @@ public class NativePlanEvaluator {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public GeneralOutIterator createKernelWithBatchIterator(
- byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList)
+ byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList, int
partitionIndex)
throws RuntimeException, IOException {
final AtomicReference<ColumnarBatchOutIterator> outIterator = new
AtomicReference<>();
final NativeMemoryManager nmm =
@@ -101,7 +101,7 @@ public class NativePlanEvaluator {
splitInfo,
iterList.toArray(new GeneralInIterator[0]),
TaskContext.get().stageId(),
- TaskContext.getPartitionId(),
+ partitionIndex, // TaskContext.getPartitionId(),
TaskContext.get().taskAttemptId(),
DebugUtil.saveInputToFile(),
BackendsApiManager.getSparkPlanExecApiInstance().rewriteSpillPath(spillDirPath));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]