This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5fccac12cc Add protoc support for ArrowScanExecNode (#20280) (#20284)
5fccac12cc is described below
commit 5fccac12cc6cfa2c10ce231f8486896f0e3fc115
Author: Josh Elkind <[email protected]>
AuthorDate: Thu Feb 12 04:50:23 2026 -0500
Add protoc support for ArrowScanExecNode (#20280) (#20284)
## Which issue does this PR close?
- Closes #20280.
## Rationale for this change
Physical plans that read Arrow files (.arrow / IPC) could not be
serialized or deserialized via the proto layer. PhysicalPlanNode already
had scan nodes for Parquet, CSV, JSON, Avro, and in-memory sources, but
not for Arrow, so a DataSourceExec using ArrowSource was not
round-trippable. That blocked use cases like distributing plans that
scan Arrow files (e.g. Ballista). This change adds Arrow scan to the
proto layer so those plans can be serialized and deserialized like the
other file formats.
## What changes are included in this PR?
Proto: Added ArrowScanExecNode (with FileScanExecConf base_conf) and
arrow_scan = 38 to the PhysicalPlanNode oneof in datafusion.proto.
Generated code: Updated prost.rs and pbjson.rs to include
ArrowScanExecNode and the ArrowScan variant (manual edits; protoc was
not run).
To-proto: In try_from_data_source_exec, when the data source is a
FileScanConfig whose file source is ArrowSource, it is now serialized as
ArrowScanExecNode.
From-proto: Implemented try_into_arrow_scan_physical_plan to deserialize
ArrowScanExecNode into DataSourceExec with ArrowSource; missing
base_conf returns an explicit error (no .unwrap()).
Test: Added roundtrip_arrow_scan in roundtrip_physical_plan.rs to assert
Arrow scan plans round-trip correctly.
## Are these changes tested?
Yes. A new test roundtrip_arrow_scan builds a physical plan that scans
Arrow files, serializes it to bytes and deserializes it back, and
asserts the round-tripped plan matches the original. The full cargo test
-p datafusion-proto suite (150 tests: unit, integration, and doc tests)
passes, including all existing roundtrip and serialization tests.
## Are there any user-facing changes?
No. This only extends the existing physical-plan proto support to Arrow
scan. Callers that already serialize/deserialize physical plans (e.g.
for distributed execution) can now round-trip plans that read Arrow
files in addition to Parquet, CSV, JSON, and Avro, with no API or
behavioral changes for existing usage.
---
datafusion/proto/proto/datafusion.proto | 5 +
datafusion/proto/src/generated/pbjson.rs | 106 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 9 +-
datafusion/proto/src/physical_plan/mod.rs | 42 ++++++++
.../proto/tests/cases/roundtrip_physical_plan.rs | 28 +++++-
5 files changed, 187 insertions(+), 3 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 67c6d5ae16..7c02688676 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -751,6 +751,7 @@ message PhysicalPlanNode {
MemoryScanExecNode memory_scan = 35;
AsyncFuncExecNode async_func = 36;
BufferExecNode buffer = 37;
+ ArrowScanExecNode arrow_scan = 38;
}
}
@@ -1106,6 +1107,10 @@ message AvroScanExecNode {
FileScanExecConf base_conf = 1;
}
+message ArrowScanExecNode {
+ FileScanExecConf base_conf = 1;
+}
+
message MemoryScanExecNode {
repeated bytes partitions = 1;
datafusion_common.Schema schema = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index b77060394f..5b2b9133ce 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1298,6 +1298,98 @@ impl<'de> serde::Deserialize<'de> for
AnalyzedLogicalPlanType {
deserializer.deserialize_struct("datafusion.AnalyzedLogicalPlanType",
FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for ArrowScanExecNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.base_conf.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.ArrowScanExecNode", len)?;
+ if let Some(v) = self.base_conf.as_ref() {
+ struct_ser.serialize_field("baseConf", v)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for ArrowScanExecNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "base_conf",
+ "baseConf",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ BaseConf,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl serde::de::Visitor<'_> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "baseConf" | "base_conf" =>
Ok(GeneratedField::BaseConf),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = ArrowScanExecNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.ArrowScanExecNode")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<ArrowScanExecNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut base_conf__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::BaseConf => {
+ if base_conf__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("baseConf"));
+ }
+ base_conf__ = map_.next_value()?;
+ }
+ }
+ }
+ Ok(ArrowScanExecNode {
+ base_conf: base_conf__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.ArrowScanExecNode",
FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for AsyncFuncExecNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -17783,6 +17875,9 @@ impl serde::Serialize for PhysicalPlanNode {
physical_plan_node::PhysicalPlanType::Buffer(v) => {
struct_ser.serialize_field("buffer", v)?;
}
+ physical_plan_node::PhysicalPlanType::ArrowScan(v) => {
+ struct_ser.serialize_field("arrowScan", v)?;
+ }
}
}
struct_ser.end()
@@ -17851,6 +17946,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
"async_func",
"asyncFunc",
"buffer",
+ "arrow_scan",
+ "arrowScan",
];
#[allow(clippy::enum_variant_names)]
@@ -17891,6 +17988,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
MemoryScan,
AsyncFunc,
Buffer,
+ ArrowScan,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -17948,6 +18046,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
"memoryScan" | "memory_scan" =>
Ok(GeneratedField::MemoryScan),
"asyncFunc" | "async_func" =>
Ok(GeneratedField::AsyncFunc),
"buffer" => Ok(GeneratedField::Buffer),
+ "arrowScan" | "arrow_scan" =>
Ok(GeneratedField::ArrowScan),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -18220,6 +18319,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode
{
return
Err(serde::de::Error::duplicate_field("buffer"));
}
physical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Buffer)
+;
+ }
+ GeneratedField::ArrowScan => {
+ if physical_plan_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("arrowScan"));
+ }
+ physical_plan_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::ArrowScan)
;
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index e95cddcc2c..d9602665c2 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1079,7 +1079,7 @@ pub mod table_reference {
pub struct PhysicalPlanNode {
#[prost(
oneof = "physical_plan_node::PhysicalPlanType",
- tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37"
+ tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38"
)]
pub physical_plan_type:
::core::option::Option<physical_plan_node::PhysicalPlanType>,
}
@@ -1161,6 +1161,8 @@ pub mod physical_plan_node {
AsyncFunc(::prost::alloc::boxed::Box<super::AsyncFuncExecNode>),
#[prost(message, tag = "37")]
Buffer(::prost::alloc::boxed::Box<super::BufferExecNode>),
+ #[prost(message, tag = "38")]
+ ArrowScan(super::ArrowScanExecNode),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1669,6 +1671,11 @@ pub struct AvroScanExecNode {
pub base_conf: ::core::option::Option<FileScanExecConf>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ArrowScanExecNode {
+ #[prost(message, optional, tag = "1")]
+ pub base_conf: ::core::option::Option<FileScanExecConf>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MemoryScanExecNode {
#[prost(bytes = "vec", repeated, tag = "1")]
pub partitions: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 60d8b5705b..bfba715b91 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -34,6 +34,7 @@ use
datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_scan_config::{FileScanConfig,
FileScanConfigBuilder};
use datafusion_datasource::sink::DataSinkExec;
use datafusion_datasource::source::{DataSource, DataSourceExec};
+use datafusion_datasource_arrow::source::ArrowSource;
#[cfg(feature = "avro")]
use datafusion_datasource_avro::source::AvroSource;
use datafusion_datasource_csv::file_format::CsvSink;
@@ -199,6 +200,9 @@ impl protobuf::PhysicalPlanNode {
PhysicalPlanType::MemoryScan(scan) => {
self.try_into_memory_scan_physical_plan(scan, ctx, codec,
proto_converter)
}
+ PhysicalPlanType::ArrowScan(scan) => {
+ self.try_into_arrow_scan_physical_plan(scan, ctx, codec,
proto_converter)
+ }
PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
.try_into_coalesce_batches_physical_plan(
coalesce_batches,
@@ -774,6 +778,27 @@ impl protobuf::PhysicalPlanNode {
Ok(DataSourceExec::from_data_source(scan_conf))
}
+ fn try_into_arrow_scan_physical_plan(
+ &self,
+ scan: &protobuf::ArrowScanExecNode,
+ ctx: &TaskContext,
+ codec: &dyn PhysicalExtensionCodec,
+ proto_converter: &dyn PhysicalProtoConverterExtension,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
+ internal_datafusion_err!("base_conf in ArrowScanExecNode is
missing.")
+ })?;
+ let table_schema = parse_table_schema_from_proto(base_conf)?;
+ let scan_conf = parse_protobuf_file_scan_config(
+ base_conf,
+ ctx,
+ codec,
+ proto_converter,
+ Arc::new(ArrowSource::new_file_source(table_schema)),
+ )?;
+ Ok(DataSourceExec::from_data_source(scan_conf))
+ }
+
#[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
fn try_into_parquet_scan_physical_plan(
&self,
@@ -2867,6 +2892,23 @@ impl protobuf::PhysicalPlanNode {
}
}
+ if let Some(scan_conf) =
data_source.as_any().downcast_ref::<FileScanConfig>() {
+ let source = scan_conf.file_source();
+ if let Some(_arrow_source) =
source.as_any().downcast_ref::<ArrowSource>() {
+ return Ok(Some(protobuf::PhysicalPlanNode {
+ physical_plan_type: Some(PhysicalPlanType::ArrowScan(
+ protobuf::ArrowScanExecNode {
+ base_conf: Some(serialize_file_scan_config(
+ scan_conf,
+ codec,
+ proto_converter,
+ )?),
+ },
+ )),
+ }));
+ }
+ }
+
#[cfg(feature = "parquet")]
if let Some((maybe_parquet, conf)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index fd4de81140..bc310150d8 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -36,8 +36,8 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
- FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig,
ParquetSource,
- wrap_partition_type_in_dict, wrap_partition_value_in_dict,
+ ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder,
FileSinkConfig,
+ ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::source::DataSourceExec;
@@ -929,6 +929,30 @@ fn roundtrip_parquet_exec_with_pruning_predicate() ->
Result<()> {
roundtrip_test(DataSourceExec::from_data_source(scan_config))
}
+#[test]
+fn roundtrip_arrow_scan() -> Result<()> {
+ let file_schema =
+ Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
+
+ let table_schema = TableSchema::new(file_schema.clone(), vec![]);
+ let file_source = Arc::new(ArrowSource::new_file_source(table_schema));
+
+ let scan_config =
+ FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),
file_source)
+ .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
+ "/path/to/file.arrow".to_string(),
+ 1024,
+ )])])
+ .with_statistics(Statistics {
+ num_rows: Precision::Inexact(100),
+ total_byte_size: Precision::Inexact(1024),
+ column_statistics: Statistics::unknown_column(&file_schema),
+ })
+ .build();
+
+ roundtrip_test(DataSourceExec::from_data_source(scan_config))
+}
+
#[tokio::test]
async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
let mut file_group =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]