alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219634874
##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn
assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
.await
.unwrap();
}
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {
+ /// Schema that is mocked by this plan.
+ schema: SchemaRef,
+
+ /// Number of output partitions. Each partition will produce this
+ /// many empty output record batches prior to panicing
+ batches_until_panics: Vec<usize>,
+}
+
+impl PanicingExec {
+ /// Create new [`PanickingExec`] with a give schema and number of
+ /// partitions, which will each panic immediately.
+ pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
+ Self {
+ schema,
+ batches_until_panics: vec![0; n_partitions],
+ }
+ }
+
+ /// Set the number of batches prior to panic for a partition
+ pub fn with_partition_panic(mut self, partition: usize, count: usize) ->
Self {
+ self.batches_until_panics[partition] = count;
+ self
+ }
+}
+
+impl ExecutionPlan for PanicingExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ // this is a leaf node and has no children
+ vec![]
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ let num_partitions = self.batches_until_panics.len();
+ Partitioning::UnknownPartitioning(num_partitions)
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Err(DataFusionError::Internal(format!(
+ "Children cannot be replaced in {:?}",
+ self
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ Ok(Box::pin(PanicingStream {
+ partition,
+ batches_until_panic: self.batches_until_panics[partition],
+ schema: Arc::clone(&self.schema),
+ ready: false,
+ }))
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "PanickingExec",)
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ unimplemented!()
+ }
+}
+
+/// A [`RecordBatchStream`] that yields every other batch and panics after
`batches_until_panic` batches have been produced
+#[derive(Debug)]
+struct PanicingStream {
Review Comment:
Changed to `PanicStream`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]