[
https://issues.apache.org/jira/browse/ARROW-12358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398635#comment-17398635
]
Lance Dacey commented on ARROW-12358:
-------------------------------------
I do not clear my append dataset, but I need to add tasks to consolidate the
small files someday. If I download a source every hour, I will have a minimum
of 24 files in a single daily partition and some of them might be small.
But yes, I am basically describing a materialized view. I cannot rely on an
incremental refresh in many cases because I partition data based on the
created_at date and not the updated_at date.
Here is an example where the data was all updated today, but there were some
rows originally created days or even months ago.
{code:python}
table = pa.table(
{
"date_id": [20210114, 20210811, 20210812, 20210813], #based on the
created_at timestamp
"created_at": ["2021-01-14 16:45:18", "2021-08-11 15:10:00",
"2021-08-12 11:19:26", "2021-08-13 23:01:47"],
"updated_at": ["2021-08-13 00:04:12", "2021-08-13 02:16:23",
"2021-08-13 09:55:44", "2021-08-13 22:36:01"],
"category": ["cow", "sheep", "dog", "cat"],
"value": [0, 99, 17, 238],
}
)
{code}
Partitioning this by date_id would save the following files in my "append"
dataset. Note that this has one row which is from January, so I cannot do an
incremental refresh from the minimum date because it would be too much data in
a real world scenario.
{code:python}
written_paths = [
"dev/test/date_id=20210812/test-20210813114024-2.parquet",
"dev/test/date_id=20210813/test-20210813114024-3.parquet",
"dev/test/date_id=20210811/test-20210813114024-1.parquet",
"dev/test/date_id=20210114/test-20210813114024-0.parquet",
]
{code}
During my next task, I create a new dataset from the written_paths above (so a
dataset of only the new/changed data). Using .get_fragments() and partition
expressions, I ultimately generate a filter expression:
{code:python}
fragments = ds.dataset(written_paths, fs).get_fragments()
for frag in fragments:
partitions = ds._get_partition_keys(frag.partition_expression)
#... other stuff
filter_expression =
<pyarrow.dataset.Expression is_in(date_id, {value_set=int32:[
20210114,
20210811,
20210812,
20210813
], skip_nulls=true})>
{code}
Finally, I use that filter to query my "append" dataset which has all
historical data. So I read all of the data in each partition
{code:python}
df = ds.dataset(source, fs).to_table(filters=filter_expression).to_pandas()
{code}
, convert the table to pandas, sort and drop duplicates, convert back to a
table, and then save to my "final" dataset with partition_filename_cb to
overwrite whatever was there. This means that if even a single row was updated
within a partition, I will be read all of the data in that partition and
recompute the final version of each row. This also requires me to use the
"use_legacy_dataset" flag to support overwriting the existing partitions.
I found a custom implementation of drop_duplicates
(https://github.com/TomScheffers/pyarrow_ops/blob/main/pyarrow_ops/ops.py)
using pyarrow Tables, but I am still just using pandas for now. I keep a close
eye on the pyarrow.compute() docs and have been slowly replacing stuff I do
with pandas directly in the pyarrow tables, which is great.
You mentioning the temporary staging area got me to realize that I could
replace my messy staging append dataset (many small files) with something
temporary that I delete each schedule, and then read from it and create a
consolidated historical append-only dataset similar to what I am doing in the
example above (one file per partition instead of potentially hundreds)
> [C++][Python][R][Dataset] Control overwriting vs appending when writing to
> existing dataset
> -------------------------------------------------------------------------------------------
>
> Key: ARROW-12358
> URL: https://issues.apache.org/jira/browse/ARROW-12358
> Project: Apache Arrow
> Issue Type: Improvement
> Components: C++
> Reporter: Joris Van den Bossche
> Priority: Major
> Labels: dataset
> Fix For: 6.0.0
>
>
> Currently, the dataset writing (eg with {{pyarrow.dataset.write_dataset}}
> uses a fixed filename template ({{"part\{i\}.ext"}}). This means that when
> you are writing to an existing dataset, you de facto overwrite previous data
> when using this default template.
> There is some discussion in ARROW-10695 about how the user can avoid this by
> ensuring the file names are unique (the user can specify the
> {{basename_template}} to be something unique). There is also ARROW-7706 about
> silently doubling data (so _not_ overwriting existing data) with the legacy
> {{parquet.write_to_dataset}} implementation.
> It could be good to have a "mode" when writing datasets that controls the
> different possible behaviours. And erroring when there is pre-existing data
> in the target directory is maybe the safest default, because both appending
> vs overwriting silently can be surprising behaviour depending on your
> expectations.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)