lazear opened a new issue, #5490:
URL: https://github.com/apache/arrow-rs/issues/5490
**Which part is this question about**
<!--
Is it code base, library api, documentation or some other part?
-->
Library API / documentation
**Describe your question**
<!--
A clear and concise description of what the question is.
-->
I am working on some code that is streaming rows of data from S3, using
either CSV or parquet files as the backing format and converting to JSON.
Generally, I am fetching groups of 50-250 rows at a time in a simple
OFFSET/LIMIT pattern, and filtering on a couple of columns.
The parquet/arrow-rs crate is substantially slower than using
async-csv/serde-json. Same dataset, just converted to CSV or Parquet. I am
compiling with release mode on.
| Format | No filter | With filter |
| ---------- | ------------ | -------------- |
| CSV | 70 ms | 80 ms |
| Parquet | 800 ms | 13500 ms |
Am I doing something terribly wrong? Is there a better way to accomplish
this task?
I have pasted the code I'm using below.
**Additional context**
```rs
/// example filter
impl ResultFilter {
fn dummy_filter(mask: ProjectionMask, s: String) -> Box<dyn
ArrowPredicate> {
Box::new(ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
let array = batch
.column_by_name("dummy")
.ok_or_else(|| ArrowError::SchemaError("invalid
schema".into()))?;
let filter = arrow_array::StringArray::new_scalar(s.to_string());
arrow::compute::kernels::comparison::contains(array, &filter)
}))
}
}
/// Take a user-specified set of columns to parse from the file, and some
filters (enum) that can be converted into ArrowPredicateFn
pub async fn read_parquet_columns_arrow<'a, F>(
mut file: F,
columns: &'a [&str],
filters: Vec<ResultFilter>,
offset: usize,
limit: usize,
) -> anyhow::Result<impl Stream<Item = Vec<u8>> + 'a>
where
F: AsyncFileReader + Unpin + Send + 'static,
{
let meta = ArrowReaderMetadata::load_async(&mut file,
Default::default()).await?;
let filter_columns = filters
.iter()
.map(ResultFilter::required_column)
.collect::<Vec<_>>();
let system_columns = meta
.parquet_schema()
.root_schema()
.get_fields()
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if filter_columns.contains(&col.name()) {
Some(idx)
} else {
None
}
})
.collect::<Vec<usize>>();
let user_columns = meta
.parquet_schema()
.root_schema()
.get_fields()
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if columns.contains(&col.name()) {
Some(idx)
} else {
None
}
})
.collect::<Vec<_>>();
let mask = ProjectionMask::roots(
meta.parquet_schema(),
user_columns
.as_slice()
.into_iter()
.chain(&system_columns)
.copied(),
);
let row_filter = RowFilter::new(filters.into_iter().map(|f|
f.build(mask.clone())).collect());
let arrow_reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta.clone())
.with_row_filter(row_filter)
.with_offset(offset)
.with_limit(limit)
.with_projection(mask)
.build()?;
// let roots = &roots;
Ok(arrow_reader
.filter_map(|f| futures::future::ready(f.ok()))
.filter_map(move |batch| {
let inner = || {
let mask = columns
.iter()
.filter_map(|name|
Some(batch.schema().column_with_name(name)?.0))
.collect::<Vec<_>>();
let batch = batch.project(&mask).ok()?;
let buf = Vec::new();
let mut wtr = arrow_json::ArrayWriter::new(buf);
wtr.write(&batch).ok()?;
wtr.finish().ok()?;
Some(wtr.into_inner())
};
futures::future::ready(inner())
}))
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]