This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e78ee4393 [VL] Pass partition id to velox functions (#4344)
e78ee4393 is described below

commit e78ee43938109c2589068b5cae0293d25848a4d7
Author: Zhen Li <[email protected]>
AuthorDate: Mon Mar 11 12:07:06 2024 +0800

    [VL] Pass partition id to velox functions (#4344)
    
    [VL] Pass partition id to velox functions.
---
 .../io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala   | 2 ++
 .../io/glutenproject/backendsapi/velox/IteratorApiImpl.scala      | 8 ++++++--
 cpp/velox/benchmarks/PlanValidatorUtil.cc                         | 4 +++-
 cpp/velox/compute/WholeStageResultIterator.cc                     | 2 ++
 cpp/velox/jni/VeloxJniWrapper.cc                                  | 3 ++-
 .../src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala | 2 ++
 .../io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala  | 1 +
 .../glutenproject/execution/WholeStageZippedPartitionsRDD.scala   | 1 +
 .../java/io/glutenproject/vectorized/NativePlanEvaluator.java     | 4 ++--
 9 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
index 5b6d326a6..197e80a17 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
@@ -178,6 +178,7 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       pipelineTime: SQLMetric,
       updateInputMetrics: InputMetricsWrapper => Unit,
       updateNativeMetrics: IMetrics => Unit,
+      partitionIndex: Int,
       inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
   ): Iterator[ColumnarBatch] = {
 
@@ -246,6 +247,7 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       rootNode: PlanNode,
       pipelineTime: SQLMetric,
       updateNativeMetrics: IMetrics => Unit,
+      partitionIndex: Int,
       materializeInput: Boolean): Iterator[ColumnarBatch] = {
     // scalastyle:on argcount
     GlutenConfig.getConf
diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
index 180957f17..f2943d31d 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala
@@ -146,6 +146,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
       pipelineTime: SQLMetric,
       updateInputMetrics: (InputMetricsWrapper) => Unit,
       updateNativeMetrics: IMetrics => Unit,
+      partitionIndex: Int,
       inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): 
Iterator[ColumnarBatch] = {
     assert(
       inputPartition.isInstanceOf[GlutenPartition],
@@ -165,7 +166,8 @@ class IteratorApiImpl extends IteratorApi with Logging {
       transKernel.createKernelWithBatchIterator(
         inputPartition.plan,
         splitInfoByteArray,
-        columnarNativeIterators)
+        columnarNativeIterators,
+        partitionIndex)
     pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
beforeBuild)
 
     Iterators
@@ -193,6 +195,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
       rootNode: PlanNode,
       pipelineTime: SQLMetric,
       updateNativeMetrics: IMetrics => Unit,
+      partitionIndex: Int,
       materializeInput: Boolean): Iterator[ColumnarBatch] = {
 
     ExecutorManager.tryTaskSet(numaBindingInfo)
@@ -207,7 +210,8 @@ class IteratorApiImpl extends IteratorApi with Logging {
         rootNode.toProtobuf.toByteArray,
         // Final iterator does not contain scan split, so pass empty split 
info to native here.
         new Array[Array[Byte]](0),
-        columnarNativeIterator
+        columnarNativeIterator,
+        partitionIndex
       )
 
     Iterators
diff --git a/cpp/velox/benchmarks/PlanValidatorUtil.cc 
b/cpp/velox/benchmarks/PlanValidatorUtil.cc
index 229804368..e299b4620 100644
--- a/cpp/velox/benchmarks/PlanValidatorUtil.cc
+++ b/cpp/velox/benchmarks/PlanValidatorUtil.cc
@@ -22,6 +22,7 @@
 #include "memory/VeloxMemoryManager.h"
 #include "substrait/SubstraitToVeloxPlanValidator.h"
 
+using namespace facebook::velox;
 using namespace gluten;
 
 /// Set spark.gluten.sql.debug=true to get validation plan and dump it into a 
json file,
@@ -43,7 +44,8 @@ int main(int argc, char** argv) {
   std::unordered_map<std::string, std::string> conf;
   conf.insert({kDebugModeEnabled, "true"});
   initVeloxBackend(conf);
-  core::QueryCtx queryCtx;
+  std::unordered_map<std::string, std::string> 
configs{{core::QueryConfig::kSparkPartitionId, "0"}};
+  core::QueryCtx queryCtx(nullptr, core::QueryConfig(configs));
   auto pool = defaultLeafVeloxMemoryPool().get();
   core::ExecCtx execCtx(pool, &queryCtx);
 
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index b74b8504d..1b217ebeb 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -509,6 +509,8 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
     // Disable driver cpu time slicing.
     configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0";
 
+    configs[velox::core::QueryConfig::kSparkPartitionId] = 
std::to_string(taskInfo_.partitionId);
+
   } catch (const std::invalid_argument& err) {
     std::string errDetails = err.what();
     throw std::runtime_error("Invalid conf arg: " + errDetails);
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 32f1bd0c8..063605001 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -110,7 +110,8 @@ 
Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFailu
   gluten::parseProtobuf(planData, planSize, &subPlan);
 
   // A query context used for function validation.
-  velox::core::QueryCtx queryCtx;
+  std::unordered_map<std::string, std::string> 
configs{{velox::core::QueryConfig::kSparkPartitionId, "0"}};
+  velox::core::QueryCtx queryCtx(nullptr, velox::core::QueryConfig(configs));
   auto pool = gluten::defaultLeafVeloxMemoryPool().get();
   // An execution context used for function validation.
   velox::core::ExecCtx execCtx(pool, &queryCtx);
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
index b68080cd9..2f506f483 100644
--- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala
@@ -60,6 +60,7 @@ trait IteratorApi {
       pipelineTime: SQLMetric,
       updateInputMetrics: (InputMetricsWrapper) => Unit,
       updateNativeMetrics: IMetrics => Unit,
+      partitionIndex: Int,
       inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
   ): Iterator[ColumnarBatch]
 
@@ -76,6 +77,7 @@ trait IteratorApi {
       rootNode: PlanNode,
       pipelineTime: SQLMetric,
       updateNativeMetrics: IMetrics => Unit,
+      partitionIndex: Int,
       materializeInput: Boolean = false): Iterator[ColumnarBatch]
   // scalastyle:on argcount
 
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
index 0aa4eec7f..36eb35203 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala
@@ -95,6 +95,7 @@ class GlutenWholeStageColumnarRDD(
           pipelineTime,
           updateInputMetrics,
           updateNativeMetrics,
+          split.index,
           inputIterators
         )
     }
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
index 6c48159f3..9a11326c5 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageZippedPartitionsRDD.scala
@@ -55,6 +55,7 @@ class WholeStageZippedPartitionsRDD(
             resCtx.root,
             pipelineTime,
             updateNativeMetrics,
+            split.index,
             materializeInput
           )
     }
diff --git 
a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
 
b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
index d81e7abee..7dc6884aa 100644
--- 
a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java
@@ -61,7 +61,7 @@ public class NativePlanEvaluator {
   // Used by WholeStageTransform to create the native computing pipeline and
   // return a columnar result iterator.
   public GeneralOutIterator createKernelWithBatchIterator(
-      byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList)
+      byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList, int 
partitionIndex)
       throws RuntimeException, IOException {
     final AtomicReference<ColumnarBatchOutIterator> outIterator = new 
AtomicReference<>();
     final NativeMemoryManager nmm =
@@ -101,7 +101,7 @@ public class NativePlanEvaluator {
             splitInfo,
             iterList.toArray(new GeneralInIterator[0]),
             TaskContext.get().stageId(),
-            TaskContext.getPartitionId(),
+            partitionIndex, // TaskContext.getPartitionId(),
             TaskContext.get().taskAttemptId(),
             DebugUtil.saveInputToFile(),
             
BackendsApiManager.getSparkPlanExecApiInstance().rewriteSpillPath(spillDirPath));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to