milenkovicm commented on PR #1066:
URL:
https://github.com/apache/datafusion-ballista/pull/1066#issuecomment-2404537306
One additional thing, I don't think we're too far away from supporting
writes in ballista client. Datafusion team did great job with `Sink` support.
```rust
use ballista::ext::BallistaExt;
use datafusion::{execution::options::ParquetReadOptions,
prelude::SessionContext};
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
//
// should we remove BallistaContext ?!
//
let ctx = SessionContext::ballista_standalone().await?;
ctx.register_parquet(
"test",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
let df = ctx.sql("select * from test").await?;
df.write_csv(
"/directory/to_write/csv",
Default::default(),
Default::default(),
)
.await?;
Ok(())
}
```
It does resolve and execute plan, writes the file, but unfortunately file
does not make sense (some kind of binary, not sure if it arrow):
```text
[DEBUG datafusion::physical_planner] Optimized physical plan:
DataSinkExec: sink=CsvSink(file_groups=[])
ParquetExec: file_groups={1 group:
[[/arrow-ballista/examples/testdata/alltypes_plain.parquet]]}, projection=[id,
bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
double_col, date_string_col, string_col, timestamp_col]
[INFO ballista_scheduler::planner] planning query stages for job m1UAyt4
[INFO ballista_scheduler::state::task_manager] Submitting execution graph:
ExecutionGraph[job_id=m1UAyt4, session_id=e294befd-26ce-4927-b447-e0779a1fcd6f,
available_tasks=0, is_successful=false]
=========ResolvedStage[stage_id=1.0, partitions=1]=========
ShuffleWriterExec: None
DataSinkExec: sink=CsvSink(file_groups=[])
ParquetExec: file_groups={1 group:
[[/arrow-ballista/examples/testdata/alltypes_plain.parquet]]}, projection=[id,
bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
double_col, date_string_col, string_col, timestamp_col]
[INFO ballista_scheduler::display] === [m1UAyt4/1] Stage finished, physical
plan with metrics ===
ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1,
repart_time=1ns, write_time=19.28307ms]
DataSinkExec: sink=CsvSink(file_groups=[]), metrics=[output_rows=8,
elapsed_compute=1ns, bytes_scanned=671, num_predicate_creation_errors=0,
page_index_rows_filtered=0, row_groups_matched_statistics=0,
predicate_evaluation_errors=0, row_groups_matched_bloom_filter=0,
row_groups_pruned_statistics=0, pushdown_rows_filtered=0,
row_groups_pruned_bloom_filter=0, file_open_errors=0, file_scan_errors=0,
time_elapsed_opening=13.991107ms, time_elapsed_processing=12.901754ms,
time_elapsed_scanning_total=1.801347ms,
time_elapsed_scanning_until_data=1.628914ms, page_index_eval_time=2ns,
pushdown_eval_time=2ns]
ParquetExec: file_groups={1 group:
[[/arrow-ballista/examples/testdata/alltypes_plain.parquet]]}, projection=[id,
bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
double_col, date_string_col, string_col, timestamp_col], metrics=[]
```
One question which arise is should we remove `BallistaContext` and use
`SessionContext::ballista_standalone().await?`, there are pros and cons for
both approaches, I would personally prefer `SessionContext` but I'm afraid
there will be unforeseen issues
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]