This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 19dd58d4 chore: Improve CometScan metrics (#1100)
19dd58d4 is described below
commit 19dd58d41b8e63ab7cf05480165a78375745cb35
Author: Andy Grove <[email protected]>
AuthorDate: Wed Nov 20 14:45:37 2024 -0700
chore: Improve CometScan metrics (#1100)
* Add native metrics for plan creation
* make messages consistent
* Include get_next_batch cost in metrics
* formatting
* fix double count of rows
---
native/core/src/execution/jni_api.rs | 22 +++++++++++++++++++--
native/core/src/execution/operators/scan.rs | 23 +++++++++++-----------
.../scala/org/apache/comet/CometExecIterator.scala | 3 ++-
spark/src/main/scala/org/apache/comet/Native.scala | 11 ++++++++++-
4 files changed, 44 insertions(+), 15 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 47d87fe1..448f383c 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -37,6 +37,7 @@ use jni::{
sys::{jbyteArray, jint, jlong, jlongArray},
JNIEnv,
};
+use std::time::{Duration, Instant};
use std::{collections::HashMap, sync::Arc, task::Poll};
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
@@ -81,6 +82,8 @@ struct ExecutionContext {
pub runtime: Runtime,
/// Native metrics
pub metrics: Arc<GlobalRef>,
+ /// The time it took to create the native plan and configure the context
+ pub plan_creation_time: Duration,
/// DataFusion SessionContext
pub session_ctx: Arc<SessionContext>,
/// Whether to enable additional debugging checks & messages
@@ -109,6 +112,8 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
// Init JVM classes
JVMClasses::init(&mut env);
+ let start = Instant::now();
+
let array = unsafe { JPrimitiveArray::from_raw(serialized_query) };
let bytes = env.convert_byte_array(array)?;
@@ -167,6 +172,8 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
// dictionaries will be dropped as well.
let session = prepare_datafusion_session_context(&configs,
task_memory_manager)?;
+ let plan_creation_time = start.elapsed();
+
let exec_context = Box::new(ExecutionContext {
id,
spark_plan,
@@ -177,6 +184,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
conf: configs,
runtime,
metrics,
+ plan_creation_time,
session_ctx: Arc::new(session),
debug_native,
explain_native,
@@ -321,6 +329,8 @@ fn pull_input_batches(exec_context: &mut ExecutionContext)
-> Result<(), CometEr
pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
e: JNIEnv,
_class: JClass,
+ stage_id: jint,
+ partition: jint,
exec_context: jlong,
array_addrs: jlongArray,
schema_addrs: jlongArray,
@@ -335,20 +345,23 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
// Because we don't know if input arrays are dictionary-encoded when
we create
// query plan, we need to defer stream initialization to first time
execution.
if exec_context.root_op.is_none() {
+ let start = Instant::now();
let planner =
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx))
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
)?;
+ let physical_plan_time = start.elapsed();
+ exec_context.plan_creation_time += physical_plan_time;
exec_context.root_op = Some(Arc::clone(&root_op));
exec_context.scans = scans;
if exec_context.explain_native {
let formatted_plan_str =
DisplayableExecutionPlan::new(root_op.as_ref()).indent(true);
- info!("Comet native query plan:\n {formatted_plan_str:}");
+ info!("Comet native query plan:\n{formatted_plan_str:}");
}
let task_ctx = exec_context.session_ctx.task_ctx();
@@ -388,7 +401,12 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
if let Some(plan) = &exec_context.root_op {
let formatted_plan_str =
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true);
- info!("Comet native query plan with
metrics:\n{formatted_plan_str:}");
+ info!(
+ "Comet native query plan with metrics:\
+ \n[Stage {} Partition {}] plan creation (including
CometScans fetching first batches) took {:?}:\
+ \n{formatted_plan_str:}",
+ stage_id, partition,
exec_context.plan_creation_time
+ );
}
}
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index b2546f83..a9bd954e 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -15,16 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use futures::Stream;
-use itertools::Itertools;
-use std::rc::Rc;
-use std::{
- any::Any,
- pin::Pin,
- sync::{Arc, Mutex},
- task::{Context, Poll},
-};
-
use crate::{
errors::CometError,
execution::{
@@ -48,9 +38,18 @@ use datafusion::{
physical_plan::{ExecutionPlan, *},
};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as
DataFusionResult};
+use futures::Stream;
+use itertools::Itertools;
use jni::objects::JValueGen;
use jni::objects::{GlobalRef, JObject};
use jni::sys::jsize;
+use std::rc::Rc;
+use std::{
+ any::Any,
+ pin::Pin,
+ sync::{Arc, Mutex},
+ task::{Context, Poll},
+};
/// ScanExec reads batches of data from Spark via JNI. The source of the scan
could be a file
/// scan or the result of reading a broadcast or shuffle exchange.
@@ -98,7 +97,6 @@ impl ScanExec {
let batch =
ScanExec::get_next(exec_context_id, input_source.as_obj(),
data_types.len())?;
timer.stop();
- baseline_metrics.record_output(batch.num_rows());
batch
} else {
InputBatch::EOF
@@ -162,6 +160,7 @@ impl ScanExec {
// This is a unit test. We don't need to call JNI.
return Ok(());
}
+ let mut timer = self.baseline_metrics.elapsed_compute().timer();
let mut current_batch = self.batch.try_lock().unwrap();
if current_batch.is_none() {
@@ -173,6 +172,8 @@ impl ScanExec {
*current_batch = Some(next_batch);
}
+ timer.stop();
+
Ok(())
}
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index b1f22726..8a349bd3 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -95,7 +95,8 @@ class CometExecIterator(
nativeUtil.getNextBatch(
numOutputCols,
(arrayAddrs, schemaAddrs) => {
- nativeLib.executePlan(plan, arrayAddrs, schemaAddrs)
+ val ctx = TaskContext.get()
+ nativeLib.executePlan(ctx.stageId(), ctx.partitionId(), plan,
arrayAddrs, schemaAddrs)
})
}
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala
b/spark/src/main/scala/org/apache/comet/Native.scala
index 03a9dea0..52063419 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -56,6 +56,10 @@ class Native extends NativeBase {
/**
* Execute a native query plan based on given input Arrow arrays.
*
+ * @param stage
+ * the stage ID, for informational purposes
+ * @param partition
+ * the partition ID, for informational purposes
* @param plan
* the address to native query plan.
* @param arrayAddrs
@@ -65,7 +69,12 @@ class Native extends NativeBase {
* @return
* the number of rows, if -1, it means end of the output.
*/
- @native def executePlan(plan: Long, arrayAddrs: Array[Long], schemaAddrs:
Array[Long]): Long
+ @native def executePlan(
+ stage: Int,
+ partition: Int,
+ plan: Long,
+ arrayAddrs: Array[Long],
+ schemaAddrs: Array[Long]): Long
/**
* Release and drop the native query plan object and context object.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]