mbrobbel commented on code in PR #14723:
URL: https://github.com/apache/datafusion/pull/14723#discussion_r1961173788
##########
datafusion/substrait/src/serializer.rs:
##########
@@ -26,28 +26,39 @@ use substrait::proto::Plan;
use std::fs::OpenOptions;
use std::io::{Read, Write};
+use std::path::Path;
-#[allow(clippy::suspicious_open_options)]
-pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) ->
Result<()> {
+/// Plans a sql and serializes the generated logical plan to bytes.
+/// The bytes are then written into a file at `path`.
+///
+/// Returns an error if the file already exists.
+pub async fn serialize(
+ sql: &str,
+ ctx: &SessionContext,
+ path: impl AsRef<Path>,
+) -> Result<()> {
let protobuf_out = serialize_bytes(sql, ctx).await;
- let mut file = OpenOptions::new().create(true).write(true).open(path)?;
+
+ let mut file = OpenOptions::new().write(true).create_new(true).open(path)?;
file.write_all(&protobuf_out?)?;
Ok(())
}
+/// Plans a sql and serializes the generated logical plan to bytes.
pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) ->
Result<Vec<u8>> {
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let proto = producer::to_substrait_plan(&plan, &ctx.state())?;
let mut protobuf_out = Vec::<u8>::new();
- proto.encode(&mut protobuf_out).map_err(|e| {
- DataFusionError::Substrait(format!("Failed to encode substrait plan:
{e}"))
- })?;
+ proto
+ .encode(&mut protobuf_out)
+ .map_err(|e| DataFusionError::Substrait(format!("Failed to encode
plan: {e}")))?;
Ok(protobuf_out)
}
-pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
+/// Reads the file at `path` and deserializes a plan from the bytes.
+pub async fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
Review Comment:
Follow-up suggestion (breaking change): this function is marked `async` but
it doesn't have to be (see comment on `deserialize_bytes`).
```suggestion
/// Reads the file at `path` and deserializes a plan from the bytes.
pub fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
```
##########
datafusion/substrait/src/serializer.rs:
##########
@@ -26,28 +26,39 @@ use substrait::proto::Plan;
use std::fs::OpenOptions;
use std::io::{Read, Write};
+use std::path::Path;
-#[allow(clippy::suspicious_open_options)]
-pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) ->
Result<()> {
+/// Plans a sql and serializes the generated logical plan to bytes.
+/// The bytes are then written into a file at `path`.
+///
+/// Returns an error if the file already exists.
+pub async fn serialize(
+ sql: &str,
+ ctx: &SessionContext,
+ path: impl AsRef<Path>,
+) -> Result<()> {
let protobuf_out = serialize_bytes(sql, ctx).await;
- let mut file = OpenOptions::new().create(true).write(true).open(path)?;
+
+ let mut file = OpenOptions::new().write(true).create_new(true).open(path)?;
file.write_all(&protobuf_out?)?;
Review Comment:
Follow-up suggestion: use `tokio::io::AsyncWriteExt::write_all` instead of
the blocking `std::io::Write::write_all` in this async function.
##########
datafusion/substrait/src/serializer.rs:
##########
@@ -56,8 +67,9 @@ pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
deserialize_bytes(protobuf_in).await
}
+/// Deserializes a plan from the bytes.
Review Comment:
Follow-up suggestion (breaking change): this function is marked `async` but
it doesn't have to be.
##########
datafusion/substrait/tests/cases/serialize.rs:
##########
@@ -31,6 +32,25 @@ mod tests {
use substrait::proto::rel_common::{Emit, EmitKind};
use substrait::proto::{rel, RelCommon};
+ #[tokio::test]
+ async fn serialize_to_file() -> Result<()> {
+ let ctx = create_context().await?;
+ let path = "tests/serialize_to_file.bin";
+ let sql = "SELECT a, b FROM data";
+
+ // Test case 1: serializing to a non-existing file should succeed.
+ serializer::serialize(sql, &ctx, path).await?;
+ serializer::deserialize(path).await?;
+
+ // Test case 3: serializing to an existing file should fail.
Review Comment:
```suggestion
// Test case 2: serializing to an existing file should fail.
```
##########
datafusion/substrait/src/serializer.rs:
##########
@@ -26,28 +26,39 @@ use substrait::proto::Plan;
use std::fs::OpenOptions;
use std::io::{Read, Write};
+use std::path::Path;
-#[allow(clippy::suspicious_open_options)]
-pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) ->
Result<()> {
+/// Plans a sql and serializes the generated logical plan to bytes.
+/// The bytes are then written into a file at `path`.
+///
+/// Returns an error if the file already exists.
+pub async fn serialize(
+ sql: &str,
+ ctx: &SessionContext,
+ path: impl AsRef<Path>,
+) -> Result<()> {
let protobuf_out = serialize_bytes(sql, ctx).await;
- let mut file = OpenOptions::new().create(true).write(true).open(path)?;
+
+ let mut file = OpenOptions::new().write(true).create_new(true).open(path)?;
file.write_all(&protobuf_out?)?;
Ok(())
}
+/// Plans a sql and serializes the generated logical plan to bytes.
pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) ->
Result<Vec<u8>> {
Review Comment:
If we replace the `sql` arg with a `DataFrame` arg, this function (and
`serialize`) can be non-async.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]