devinjdangelo commented on issue #7079:
URL:
https://github.com/apache/arrow-datafusion/issues/7079#issuecomment-1652538831
Thanks @alamb for the pointer. I will take a look at that other write code!
I was able to develop a POC implementation on the existing write_json method
to prove out the potential for a significant speed up. I changed the above test
code to separate out the read/write timings:
```rust
use std::{io::Error, time::Instant, sync::Arc};
use datafusion::prelude::*;
use chrono;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use url::Url;
const FILENAME: &str =
"/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let _ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
_ctx.runtime_env().register_object_store(&local_url, local);
let _read_options = ParquetReadOptions { file_extension: ".parquet",
table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
let start = Instant::now();
let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap()
//select a few columns with types compatible with write_json method
.select_columns(&["l_orderkey", "l_partkey",
"l_receiptdate"])?.cache().await?;
let elapsed = Instant::now() - start;
println!("read parquet to memory took -> {elapsed:?}");
let start2 = Instant::now();
_df.write_json("file://local/home/dev/arrow-datafusion/test_out/").await?;
let elapsed2 = Instant::now() - start2;
println!("write as json to disk took -> {elapsed2:?}");
Ok(())
}
```
As a baseline, the current `write_json` on main results in the following
timings:
read parquet to memory took -> 10.985273516s
write as json to disk took -> 191.64463431s
I modified the plan_to_json method as follows:
```rust
pub async fn plan_to_json(
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = store.clone();
let plan: Arc<dyn ExecutionPlan> = plan.clone();
let filename = format!("{}/part-{i}.json", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;
let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) =
storeref.put_multipart(&file).await?;
let mut inner_join_set = JoinSet::new();
while let Some(batch) = stream.try_next().await?{
inner_join_set.spawn(async move {
let buffer = Vec::with_capacity(1024);
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
let r: Result<Vec<u8>, DataFusionError> =
Ok(writer.into_inner());
r
});
}
while let Some(result) = inner_join_set.join_next().await{
match result {
Ok(r) => {
let batch = r?;
multipart_writer.write_all(&batch).await?;
},
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => res?, // propagate DataFusion error
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
Ok(())
}
```
Now the timings are:
read parquet to memory took -> 10.990137608s
write as json to disk took -> 18.332761845s
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]