[ 
https://issues.apache.org/jira/browse/ARROW-12358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398635#comment-17398635
 ] 

Lance Dacey commented on ARROW-12358:
-------------------------------------


I do not clear my append dataset, but I need to add tasks to consolidate the 
small files someday. If I download a source every hour, I will have a minimum 
of 24 files in a single daily partition and some of them might be small. 

But yes, I am basically describing a materialized view. I cannot rely on an 
incremental refresh in many cases because I partition data based on the 
created_at date and not the updated_at date.

Here is an example where the data was all updated today, but there were some 
rows originally created days or even months ago.

{code:python}
table = pa.table(
    {
        "date_id": [20210114, 20210811, 20210812, 20210813],    #based on the 
created_at timestamp
        "created_at": ["2021-01-14 16:45:18", "2021-08-11 15:10:00", 
"2021-08-12 11:19:26", "2021-08-13 23:01:47"],
        "updated_at": ["2021-08-13 00:04:12", "2021-08-13 02:16:23", 
"2021-08-13 09:55:44", "2021-08-13 22:36:01"],
        "category": ["cow", "sheep", "dog", "cat"],
        "value": [0, 99, 17, 238],
    }
)
{code}

Partitioning this by date_id would save the following files in my "append" 
dataset. Note that this has one row which is from January, so I cannot do an 
incremental refresh from the minimum date because it would be too much data in 
a real world scenario. 

{code:python}
written_paths = [
    "dev/test/date_id=20210812/test-20210813114024-2.parquet",
    "dev/test/date_id=20210813/test-20210813114024-3.parquet",
    "dev/test/date_id=20210811/test-20210813114024-1.parquet",
    "dev/test/date_id=20210114/test-20210813114024-0.parquet",
]
{code}


During my next task, I create a new dataset from the written_paths above (so a 
dataset of only the new/changed data). Using .get_fragments() and partition 
expressions, I ultimately generate a filter expression:

{code:python}
fragments = ds.dataset(written_paths, fs).get_fragments()
for frag in fragments:
    partitions = ds._get_partition_keys(frag.partition_expression)
#... other stuff
filter_expression = 
<pyarrow.dataset.Expression is_in(date_id, {value_set=int32:[
  20210114,
  20210811,
  20210812,
  20210813
], skip_nulls=true})>
{code}

Finally, I use that filter to query my "append" dataset which has all 
historical data. So I read all of the data in each partition 
{code:python}
df = ds.dataset(source, fs).to_table(filters=filter_expression).to_pandas()
{code}
, convert the table to pandas, sort and drop duplicates, convert back to a 
table, and then save to my "final" dataset with partition_filename_cb to 
overwrite whatever was there. This means that if even a single row was updated 
within a partition, I will be read all of the data in that partition and 
recompute the final version of each row. This also requires me to use the 
"use_legacy_dataset" flag to support overwriting the existing partitions.

I found a custom implementation of drop_duplicates 
(https://github.com/TomScheffers/pyarrow_ops/blob/main/pyarrow_ops/ops.py) 
using pyarrow Tables, but I am still just using pandas for now. I keep a close 
eye on the pyarrow.compute() docs and have been slowly replacing stuff I do 
with pandas directly in the pyarrow tables, which is great.

You mentioning the temporary staging area got me to realize that I could 
replace my messy staging append dataset (many small files) with something 
temporary that I delete each schedule, and then read from it and create a 
consolidated historical append-only dataset similar to what I am doing in the 
example above (one file per partition instead of potentially hundreds)






> [C++][Python][R][Dataset] Control overwriting vs appending when writing to 
> existing dataset
> -------------------------------------------------------------------------------------------
>
>                 Key: ARROW-12358
>                 URL: https://issues.apache.org/jira/browse/ARROW-12358
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++
>            Reporter: Joris Van den Bossche
>            Priority: Major
>              Labels: dataset
>             Fix For: 6.0.0
>
>
> Currently, the dataset writing (eg with {{pyarrow.dataset.write_dataset}} 
> uses a fixed filename template ({{"part\{i\}.ext"}}). This means that when 
> you are writing to an existing dataset, you de facto overwrite previous data 
> when using this default template.
> There is some discussion in ARROW-10695 about how the user can avoid this by 
> ensuring the file names are unique (the user can specify the 
> {{basename_template}} to be something unique). There is also ARROW-7706 about 
> silently doubling data (so _not_ overwriting existing data) with the legacy 
> {{parquet.write_to_dataset}} implementation. 
> It could be good to have a "mode" when writing datasets that controls the 
> different possible behaviours. And erroring when there is pre-existing data 
> in the target directory is maybe the safest default, because both appending 
> vs overwriting silently can be surprising behaviour depending on your 
> expectations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to