alamb commented on code in PR #8651:
URL: https://github.com/apache/arrow-datafusion/pull/8651#discussion_r1437093316
##########
datafusion-cli/src/print_format.rs:
##########
@@ -44,141 +48,134 @@ impl FromStr for PrintFormat {
}
macro_rules! batches_to_json {
- ($WRITER: ident, $batches: expr) => {{
- let mut bytes = vec![];
+ ($WRITER: ident, $writer: expr, $batches: expr) => {{
{
- let mut writer = $WRITER::new(&mut bytes);
- $batches.iter().try_for_each(|batch| writer.write(batch))?;
- writer.finish()?;
+ if !$batches.is_empty() {
+ let mut json_writer = $WRITER::new(&mut *$writer);
+ for batch in $batches {
+ json_writer.write(batch)?;
+ }
+ json_writer.finish()?;
+ writeln!($writer)?;
+ }
}
- String::from_utf8(bytes).map_err(|e|
DataFusionError::External(Box::new(e)))?
+ Ok(()) as Result<()>
}};
}
-fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) ->
Result<String> {
- let mut bytes = vec![];
- {
- let builder = WriterBuilder::new()
- .with_header(true)
- .with_delimiter(delimiter);
- let mut writer = builder.build(&mut bytes);
- for batch in batches {
- writer.write(batch)?;
- }
+fn print_batches_with_sep<W: std::io::Write>(
+ writer: &mut W,
+ batches: &[RecordBatch],
+ delimiter: u8,
+ with_header: bool,
+) -> Result<()> {
+ let builder = WriterBuilder::new()
+ .with_header(with_header)
+ .with_delimiter(delimiter);
+ let mut csv_writer = builder.build(writer);
+
+ for batch in batches {
+ csv_writer.write(batch)?;
}
- let formatted =
- String::from_utf8(bytes).map_err(|e|
DataFusionError::External(Box::new(e)))?;
- Ok(formatted)
-}
-fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
- let lines: Vec<String> = s.lines().map(String::from).collect();
-
- assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border
-
- let last_line = &lines[lines.len() - 1]; // bottom border line
-
- let spaces = last_line.len().saturating_sub(4);
- let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
-
- let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and
`maxrows` lines
- result.extend(vec![dotted_line; 3]); // Append ... lines
- result.push(last_line.clone());
-
- result.join("\n")
+ Ok(())
}
-fn format_batches_with_maxrows(
+fn format_batches_with_maxrows<W: std::io::Write>(
+ writer: &mut W,
batches: &[RecordBatch],
maxrows: MaxRows,
-) -> Result<String> {
+) -> Result<()> {
match maxrows {
MaxRows::Limited(maxrows) => {
- // Only format enough batches for maxrows
+ // Filter batches to meet the maxrows condition
let mut filtered_batches = Vec::new();
- let mut batches = batches;
- let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
- if row_count > maxrows {
- let mut accumulated_rows = 0;
-
- for batch in batches {
+ let mut row_count: usize = 0;
+ for batch in batches {
+ if row_count + batch.num_rows() > maxrows {
+ // If adding this batch exceeds maxrows, slice the batch
+ let limit = maxrows - row_count;
+ let sliced_batch = batch.slice(0, limit);
+ filtered_batches.push(sliced_batch);
+ break;
+ } else {
filtered_batches.push(batch.clone());
- if accumulated_rows + batch.num_rows() > maxrows {
- break;
- }
- accumulated_rows += batch.num_rows();
+ row_count += batch.num_rows();
}
-
- batches = &filtered_batches;
- }
-
- let mut formatted = format!(
- "{}",
- pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?,
- );
-
- if row_count > maxrows {
- formatted = keep_only_maxrows(&formatted, maxrows);
}
- Ok(formatted)
+ let formatted = pretty_format_batches_with_options(
+ &filtered_batches,
+ &DEFAULT_FORMAT_OPTIONS,
+ )?;
+ write!(writer, "{}", formatted)?;
}
MaxRows::Unlimited => {
- // maxrows not specified, print all rows
- Ok(format!(
- "{}",
- pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?,
- ))
+ let formatted =
+ pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?;
+ write!(writer, "{}", formatted)?;
}
}
+
+ Ok(())
}
impl PrintFormat {
Review Comment:
It doesn't have to be for this PR, but I think this code could be simplified
by putting all the state management into some object
Perhaps something like this (would have to also have a field for the table
being built up)
```rust
pub struct OutputState {
format: PrintFormat,
with_header: bool,
maxrows: MaxRows,
rows_printed: usize,
seen_header: bool
}
```
And then it would be possible to avoid having to collect all the batches
prior to printing, with a main loop like
```rust
// configure output state ...
let state = OutputState::new()
.with_header(...);
while let Some(batch) = plan.next().await? {
// formats other than table this would print out the batch
state.print_batch(batch)
}
// for table format, flush any buffered tables
state.finsh()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]