legout commented on issue #13747:
URL: https://github.com/apache/arrow/issues/13747#issuecomment-1271622134
> > Thank you @legout. Duckdb works really well, but polars is struggling.
Maybe I am doing something wrong.
> > But anyway here is how it worked for me
> > ```python
> > # pyarrow 8.0.0
> > # duckdb 0.5.1
> > # polars 0.14.18
> > ib = dataset("install-base-from-vdw-standard/", filesystem=fs,
partitioning="hive")
> >
> > ib.count_rows()
> > # 1415259797
> > ib.schema
> > """
> > bev: bool
> > market: int16
> > function_group: int32
> > part: int32
> > kdp: bool
> > kdp_accessory: bool
> > yearweek: int32
> > qty_part: int32
> > vehicle_type: int32
> > model_year: int32
> > -- schema metadata --
> > pandas: '{"index_columns": [], "column_indexes": [{"name": null,
"field_n' + 1081
> > """
> >
> > def do_duckdb():
> > sql = """
> > SELECT i.part,
> > i.bev,
> > i.market,
> > kdp_accessory,
> > yearweek,
> > SUM(i.qty_part) as qty_part_sum,
> > FROM ib i
> > WHERE vehicle_type=536
> > GROUP BY
> > i.part,
> > i.bev,
> > i.market,
> > i.kdp_accessory,
> > yearweek
> > """
> > conn = duckdb.connect(":memory:")
> > result = conn.execute(sql)
> > table = result.fetch_arrow_table()
> > return table
> >
> >
> > def do_polar():
> > table = (
> > pl.scan_ds(ib)
> > .filter("vehicle_type" == 536)
> > .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
> > .agg(pl.col("qty_part").sum())
> > .collect()
> > .to_arrow()
> > )
> > return table
> >
> > %time table = do_duckdb()
> > # memory consumption increased temporarily with 2GB, 18.8s
> >
> > %time table = do_polar()
> > # memory consumption increased slowly to fill almost all memory (32GB)
before
> > # normalizing, 4min 54s
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > Note, duckdb was 50% faster than my pyarrow implementation for doing it
on a table. duckdb used a little more RAM but not much. Pyarrow table in
batches uses less RAM, but slows it down a little.
> > ```python
> > def do_pyarrow_batches():
> > table = []
> > columns = ["part", "bev", "market", "kdp_accessory", "yearweek",
"qty_part"]
> > filters = field("vehicle_type") == 536
> > agg = [("qty_part", "sum")]
> > group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
> > for batch in ib.to_batches(columns=columns, filter=filters,
batch_size=1e6):
> > t = pyarrow.Table.from_batches([batch])
> > table.append(t.group_by(group_by).aggregate(agg))
> > table = pyarrow.concat_tables(table)
> > # need to aggregate again
> > new_agg = []
> > for a in agg:
> > how = a[1].replace("hash_", "")
> > new_agg.append((a[0] + "_" + how, "sum"))
> > table = table.group_by(group_by).aggregate(new_agg)
> > return table
> >
> > def do_pyarrow_table():
> > table = (
> > ib.to_table(
> > columns=["part", "bev", "market", "kdp_accessory",
"yearweek", "qty_part"],
> > filter=field("vehicle_type") == 536,
> > )
> > .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
> > .aggregate([("qty_part", "sum")])
> > )
> > return table
> >
> > ``ยด
> > ```
>
> Polars is not able to push down the filter into a pyarrow dataset. Only in
readers directly implemented by polars. (scan_parquet, scan_csv, etc). So that
means the dataset is first completely read in memory. We could see if we can
translate out predicate to filters pyarrow understands.
For me, a pyarrow datasets is the way entry point for running
queries/analytics on remote (s3 bucket) parquet datasets, whitout loading all
data into memory. Is this possible with scan_parquet, too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]