This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c67161b97 Adds optional serde support to datafusion-proto (#2892)
c67161b97 is described below
commit c67161b97731d71ca545231890bc159fab6439be
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun Jul 17 14:28:29 2022 -0400
Adds optional serde support to datafusion-proto (#2892)
* Add optional serde support to datafusion-proto (#2889)
* Add public methods for JSON serde (#64)
* Misc suggestions
* Update datafusion/proto/Cargo.toml
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* Fixes
* Fixup Cargo.toml
* Format Cargo.toml
Co-authored-by: Brent Gardner <[email protected]>
---
.github/workflows/rust.yml | 4 +--
datafusion/proto/Cargo.toml | 12 +++++--
datafusion/proto/build.rs | 38 ++++++++++++++++++++--
datafusion/proto/src/bytes/mod.rs | 46 ++++++++++++++++++++++++++
datafusion/proto/src/lib.rs | 68 ++++++++++++++++++++++++---------------
5 files changed, 134 insertions(+), 34 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index b534ed224..cd24c7f08 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -77,7 +77,7 @@ jobs:
CARGO_TARGET_DIR: "/github/home/target"
- name: Check Workspace builds with all features
run: |
- cargo check --workspace --benches --features avro,jit,scheduler
+ cargo check --workspace --benches --features avro,jit,scheduler,json
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
@@ -121,7 +121,7 @@ jobs:
run: |
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
- cargo test --features avro,jit,scheduler
+ cargo test --features avro,jit,scheduler,json
# test datafusion-sql examples
cargo run --example sql
# test datafusion examples
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index 81481a6ef..d3f70b2c0 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
-keywords = [ "arrow", "query", "sql" ]
+keywords = ["arrow", "query", "sql"]
edition = "2021"
rust-version = "1.58"
@@ -33,18 +33,24 @@ name = "datafusion_proto"
path = "src/lib.rs"
[features]
+default = []
+json = ["pbjson", "pbjson-build", "serde", "serde_json"]
[dependencies]
arrow = { version = "18.0.0" }
datafusion = { path = "../core", version = "10.0.0" }
datafusion-common = { path = "../common", version = "10.0.0" }
datafusion-expr = { path = "../expr", version = "10.0.0" }
+pbjson = { version = "0.3", optional = true }
+pbjson-types = { version = "0.3", optional = true }
prost = "0.10"
-
+serde = { version = "1.0", optional = true }
+serde_json = { version = "1.0", optional = true }
[dev-dependencies]
doc-comment = "0.3"
tokio = "1.18"
[build-dependencies]
-tonic-build = { version = "0.7" }
+pbjson-build = { version = "0.3", optional = true }
+prost-build = { version = "0.7" }
diff --git a/datafusion/proto/build.rs b/datafusion/proto/build.rs
index 414593eee..e13ffa86a 100644
--- a/datafusion/proto/build.rs
+++ b/datafusion/proto/build.rs
@@ -15,12 +15,44 @@
// specific language governing permissions and limitations
// under the License.
+type Error = Box<dyn std::error::Error>;
+type Result<T, E = Error> = std::result::Result<T, E>;
+
fn main() -> Result<(), String> {
// for use in docker build where file changes can be wonky
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
-
println!("cargo:rerun-if-changed=proto/datafusion.proto");
- tonic_build::configure()
- .compile(&["proto/datafusion.proto"], &["proto"])
+
+ build()?;
+
+ Ok(())
+}
+
+#[cfg(feature = "json")]
+fn build() -> Result<(), String> {
+ let descriptor_path =
std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap())
+ .join("proto_descriptor.bin");
+
+ prost_build::Config::new()
+ .file_descriptor_set_path(&descriptor_path)
+ .compile_well_known_types()
+ .extern_path(".google.protobuf", "::pbjson_types")
+ .compile_protos(&["proto/datafusion.proto"], &["proto"])
+ .map_err(|e| format!("protobuf compilation failed: {}", e))?;
+
+ let descriptor_set = std::fs::read(descriptor_path).unwrap();
+ pbjson_build::Builder::new()
+ .register_descriptors(&descriptor_set)
+ .unwrap()
+ .build(&[".datafusion"])
+ .map_err(|e| format!("pbjson compilation failed: {}", e))?;
+
+ Ok(())
+}
+
+#[cfg(not(feature = "json"))]
+fn build() -> Result<(), String> {
+ prost_build::Config::new()
+ .compile_protos(&["proto/datafusion.proto"], &["proto"])
.map_err(|e| format!("protobuf compilation failed: {}", e))
}
diff --git a/datafusion/proto/src/bytes/mod.rs
b/datafusion/proto/src/bytes/mod.rs
index 37374b3ef..6ddaecae0 100644
--- a/datafusion/proto/src/bytes/mod.rs
+++ b/datafusion/proto/src/bytes/mod.rs
@@ -107,6 +107,19 @@ pub fn logical_plan_to_bytes(plan: &LogicalPlan) ->
Result<Bytes> {
logical_plan_to_bytes_with_extension_codec(plan, &extension_codec)
}
+/// Serialize a LogicalPlan as json
+#[cfg(feature = "json")]
+pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
+ let extension_codec = DefaultExtensionCodec {};
+ let protobuf =
+ protobuf::LogicalPlanNode::try_from_logical_plan(plan,
&extension_codec)
+ .map_err(|e| {
+ DataFusionError::Plan(format!("Error serializing plan: {}", e))
+ })?;
+ serde_json::to_string(&protobuf)
+ .map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{}", e)))
+}
+
/// Serialize a LogicalPlan as bytes, using the provided extension codec
pub fn logical_plan_to_bytes_with_extension_codec(
plan: &LogicalPlan,
@@ -121,6 +134,14 @@ pub fn logical_plan_to_bytes_with_extension_codec(
Ok(buffer.into())
}
+/// Deserialize a LogicalPlan from json
+#[cfg(feature = "json")]
+pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) ->
Result<LogicalPlan> {
+ let back: protobuf::LogicalPlanNode = serde_json::from_str(json).unwrap();
+ let extension_codec = DefaultExtensionCodec {};
+ back.try_into_logical_plan(ctx, &extension_codec)
+}
+
/// Deserialize a LogicalPlan from bytes
pub fn logical_plan_from_bytes(
bytes: &[u8],
@@ -183,6 +204,31 @@ mod test {
Expr::from_bytes(b"Leet").unwrap();
}
+ #[test]
+ #[cfg(feature = "json")]
+ fn plan_to_json() {
+ use datafusion_common::DFSchema;
+ use datafusion_expr::logical_plan::EmptyRelation;
+
+ let plan = LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: Arc::new(DFSchema::empty()),
+ });
+ let actual = logical_plan_to_json(&plan).unwrap();
+ let expected = r#"{"emptyRelation":{}}"#.to_string();
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ #[cfg(feature = "json")]
+ fn json_to_plan() {
+ let input = r#"{"emptyRelation":{}}"#.to_string();
+ let ctx = SessionContext::new();
+ let actual = logical_plan_from_json(&input, &ctx).unwrap();
+ let result = matches!(actual, LogicalPlan::EmptyRelation(_));
+ assert!(result, "Should parse empty relation");
+ }
+
#[test]
fn udf_roundtrip_with_registry() {
let ctx = context_with_udf();
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 0aa00bc75..4683abd7a 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -23,6 +23,9 @@ use datafusion_common::DataFusionError;
#[allow(clippy::all)]
pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/datafusion.rs"));
+
+ #[cfg(feature = "json")]
+ include!(concat!(env!("OUT_DIR"), "/datafusion.serde.rs"));
}
pub mod bytes;
@@ -75,19 +78,32 @@ mod roundtrip_tests {
use std::fmt::Formatter;
use std::sync::Arc;
+ #[cfg(feature = "json")]
+ fn roundtrip_json_test(proto: &protobuf::LogicalExprNode) {
+ let string = serde_json::to_string(proto).unwrap();
+ let back: protobuf::LogicalExprNode =
serde_json::from_str(&string).unwrap();
+ assert_eq!(proto, &back);
+ }
+
+ #[cfg(not(feature = "json"))]
+ fn roundtrip_json_test(_proto: &protobuf::LogicalExprNode) {}
+
// Given a DataFusion logical Expr, convert it to protobuf and back, using
debug formatting to test
// equality.
- macro_rules! roundtrip_expr_test {
- ($initial_struct:ident, $ctx:ident) => {
- let proto: protobuf::LogicalExprNode =
(&$initial_struct).try_into().unwrap();
+ fn roundtrip_expr_test<T, E>(initial_struct: T, ctx: SessionContext)
+ where
+ for<'a> &'a T: TryInto<protobuf::LogicalExprNode, Error = E> + Debug,
+ E: Debug,
+ {
+ let proto: protobuf::LogicalExprNode =
(&initial_struct).try_into().unwrap();
+ let round_trip: Expr = parse_expr(&proto, &ctx).unwrap();
- let round_trip: Expr = parse_expr(&proto, &$ctx).unwrap();
+ assert_eq!(
+ format!("{:?}", &initial_struct),
+ format!("{:?}", round_trip)
+ );
- assert_eq!(
- format!("{:?}", $initial_struct),
- format!("{:?}", round_trip)
- );
- };
+ roundtrip_json_test(&proto);
}
fn new_box_field(name: &str, dt: DataType, nullable: bool) -> Box<Field> {
@@ -807,7 +823,7 @@ mod roundtrip_tests {
let test_expr = Expr::Not(Box::new(lit(1.0_f32)));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -815,7 +831,7 @@ mod roundtrip_tests {
let test_expr = Expr::IsNull(Box::new(col("id")));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -823,7 +839,7 @@ mod roundtrip_tests {
let test_expr = Expr::IsNotNull(Box::new(col("id")));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -836,7 +852,7 @@ mod roundtrip_tests {
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -848,7 +864,7 @@ mod roundtrip_tests {
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -859,7 +875,7 @@ mod roundtrip_tests {
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -871,7 +887,7 @@ mod roundtrip_tests {
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -879,7 +895,7 @@ mod roundtrip_tests {
let test_expr = Expr::Negative(Box::new(lit(1.0_f32)));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -891,7 +907,7 @@ mod roundtrip_tests {
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -899,7 +915,7 @@ mod roundtrip_tests {
let test_expr = Expr::Wildcard;
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -909,7 +925,7 @@ mod roundtrip_tests {
args: vec![col("col")],
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -921,7 +937,7 @@ mod roundtrip_tests {
};
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -975,7 +991,7 @@ mod roundtrip_tests {
let mut ctx = SessionContext::new();
ctx.register_udaf(dummy_agg);
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -1000,7 +1016,7 @@ mod roundtrip_tests {
let mut ctx = SessionContext::new();
ctx.register_udf(udf);
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -1012,7 +1028,7 @@ mod roundtrip_tests {
]));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -1020,7 +1036,7 @@ mod roundtrip_tests {
let test_expr = Expr::GroupingSet(GroupingSet::Rollup(vec![col("a"),
col("b")]));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
#[test]
@@ -1028,6 +1044,6 @@ mod roundtrip_tests {
let test_expr = Expr::GroupingSet(GroupingSet::Cube(vec![col("a"),
col("b")]));
let ctx = SessionContext::new();
- roundtrip_expr_test!(test_expr, ctx);
+ roundtrip_expr_test(test_expr, ctx);
}
}