[ 
https://issues.apache.org/jira/browse/ARROW-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17243963#comment-17243963
 ] 

Lance Dacey commented on ARROW-10517:
-------------------------------------

Yes, I think the uuid specifier would work fine for my purposes. Generally, I 
have had pyarrow create the resulting filenames with the partition_filename_cb 
function, but you are right - I could probably generate the filenames directly 
since I am dictating which filters to use in the first place (and each filter 
becomes a file).
 
{code:python}
d1 = {
    "id": [1, 2, 3, 4, 5],
    "created_at": [
        datetime.date(2020, 5, 7),
        datetime.date(2020, 6, 19),
        datetime.date(2020, 9, 14),
        datetime.date(2020, 11, 22),
        datetime.date(2020, 12, 2),
    ],
    "updated_at": [
        datetime.date(2020, 12, 2),
        datetime.date(2020, 12, 2),
        datetime.date(2020, 12, 2),
        datetime.date(2020, 12, 2),
        datetime.date(2020, 12, 2),
    ],
}
df = pd.DataFrame(data=d1)
table = pa.Table.from_pandas(df)

#historical dataset which has all history of each ID each time it gets updated
#each created_at partition would have a sub-partition for updated_at since 
historical data can change - this can generate many small files depending on 
how often my schedule runs to download data
#I use pa.string() as the partition data type here because I have had issues 
using pa.date32(), sometimes I will get an error that we cannot convert a 
string to date32() but using a date works perfectly fine
ds.write_dataset(
    data=table,
    base_dir=output_path,
    format="parquet",
    partitioning=ds.partitioning(pa.schema([("created_at", pa.string()), 
("updated_at", pa.string())]), flavor="hive"),
    schema=table.schema,
    filesystem=fs,
)

#the next task would read the dataset and filter for the created_at partition 
(ignoring the updated_at partition)
dataset = ds.dataset(
    source=output_path, 
    format="parquet",
    partitioning="hive",
    filesystem=fs,
)

#I save the unique filters (each created_at value) externally and build the 
dataset filter expression
filter_expression = pq._filters_to_expression(filters=[[('created_at', '==', 
'2020-05-07')], 
[('created_at', '==', '2020-06-19')], [('created_at', '==', '2020-09-14')], 
[('created_at', '==', '2020-11-22')], [('created_at', '==', '2020-12-02')]])

table = dataset.to_table(filter=filter_expression)

#Turn the table into a pandas dataframe to remove duplicates and retain the 
latest row for each ID
df = table.to_pandas(self_destruct=True).sort_values(["id", "updated_at"], 
ascending=True).drop_duplicates(["id"], keep="last")
table = pa.Table.from_pandas(df)

#this writes the final dataset. 
#There would be one file per created_at partition. 
"container/created_at=2020-05-07/2020-05-07.parquet"
#our visualization tool connects directly to these parquet files so we can 
report on the latest status of each ticket (not much attention is paid to the 
historical changes)
pq.write_to_dataset(
    table=table,
    root_path=output_path,
    partition_cols=["created_at"],
    partition_filename_cb=lambda x: str(x[-1]) + '.parquet',,
    filesystem=fs,
)
{code}

***Note regarding the filters I use. I am using code similar to something I 
found in the pyarrow.write_to_dataset function (pasted below) to generate these 
filters. I could probably generate filenames instead though and use write_table 
like you mentioned.

{code:python}
        for keys, subgroup in data_df.groupby(partition_keys):
            if not isinstance(keys, tuple):
                keys = (keys,)
            subdir = '/'.join(
                ['{colname}={value}'.format(colname=name, value=val)
                 for name, val in zip(partition_cols, keys)])
{code}


> [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob
> ------------------------------------------------------------------------
>
>                 Key: ARROW-10517
>                 URL: https://issues.apache.org/jira/browse/ARROW-10517
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 2.0.0
>         Environment: Ubuntu 18.04
>            Reporter: Lance Dacey
>            Priority: Major
>              Labels: azureblob, dataset, dataset-parquet-read, 
> dataset-parquet-write, fsspec
>             Fix For: 2.0.0
>
>         Attachments: ss.PNG, ss2.PNG
>
>
>  
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==2.0.0
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> import pyarrow.dataset as ds
> import fsspec
> from pyarrow.dataset import DirectoryPartitioning
> fs = fsspec.filesystem(protocol='abfs', 
>                        account_name=base.login, 
>                        account_key=base.password)
> ds.write_dataset(data=table, 
>                  base_dir="dev/test7", 
>                  basename_template=None, 
>                  format="parquet",
>                  partitioning=DirectoryPartitioning(pa.schema([("year", 
> pa.string()), ("month", pa.string()), ("day", pa.string())])), 
>                  schema=table.schema,
>                  filesystem=fs, 
>                 )
> {code}
>  I think this is due to early versions of adlfs having mkdir(). Although I 
> use write_to_dataset and write_table all of the time, so I am not sure why 
> this would be an issue.
> {code:python}
> ---------------------------------------------------------------------------
> RuntimeError                              Traceback (most recent call last)
> <ipython-input-40-bb38d83f896e> in <module>
>      13 
>      14 
> ---> 15 ds.write_dataset(data=table, 
>      16                  base_dir="dev/test7",
>      17                  basename_template=None,
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in 
> write_dataset(data, base_dir, basename_template, format, partitioning, 
> schema, filesystem, file_options, use_threads)
>     771     filesystem, _ = _ensure_fs(filesystem)
>     772 
> --> 773     _filesystemdataset_write(
>     774         data, base_dir, basename_template, schema,
>     775         filesystem, partitioning, file_options, use_threads,
> /opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in 
> pyarrow._dataset._filesystemdataset_write()
> /opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in 
> pyarrow._fs._cb_create_dir()
> /opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self, 
> path, recursive)
>     226     def create_dir(self, path, recursive):
>     227         # mkdir also raises FileNotFoundError when base directory is 
> not found
> --> 228         self.fs.mkdir(path, create_parents=recursive)
>     229 
>     230     def delete_dir(self, path):
> /opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path, 
> delimiter, exists_ok, **kwargs)
>     561             else:
>     562                 ## everything else
> --> 563                 raise RuntimeError(f"Cannot create 
> {container_name}{delimiter}{path}.")
>     564         else:
>     565             if container_name in self.ls("") and path:
> RuntimeError: Cannot create dev/test7/2020/01/28.
> {code}
>  
> Next, if I try to read a dataset (keep in mind that this works with 
> read_table and ParquetDataset):
> {code:python}
> ds.dataset(source="dev/staging/evaluations", 
>            format="parquet", 
>            partitioning="hive",
>            exclude_invalid_files=False,
>            filesystem=fs
>           )
> {code}
>  
> This doesn't seem to respect the filesystem connected to Azure Blob.
> {code:python}
> ---------------------------------------------------------------------------
> FileNotFoundError                         Traceback (most recent call last)
> <ipython-input-41-4de65fe95db7> in <module>
> ----> 1 ds.dataset(source="dev/staging/evaluations", 
>       2            format="parquet",
>       3            partitioning="hive",
>       4            exclude_invalid_files=False,
>       5            filesystem=fs
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, 
> schema, format, filesystem, partitioning, partition_base_dir, 
> exclude_invalid_files, ignore_prefixes)
>     669     # TODO(kszucs): support InMemoryDataset for a table input
>     670     if _is_path_like(source):
> --> 671         return _filesystem_dataset(source, **kwargs)
>     672     elif isinstance(source, (tuple, list)):
>     673         if all(_is_path_like(elem) for elem in source):
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in 
> _filesystem_dataset(source, schema, filesystem, partitioning, format, 
> partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
>     426         fs, paths_or_selector = _ensure_multiple_sources(source, 
> filesystem)
>     427     else:
> --> 428         fs, paths_or_selector = _ensure_single_source(source, 
> filesystem)
>     429 
>     430     options = FileSystemFactoryOptions(
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in 
> _ensure_single_source(path, filesystem)
>     402         paths_or_selector = [path]
>     403     else:
> --> 404         raise FileNotFoundError(path)
>     405 
>     406     return filesystem, paths_or_selector
> FileNotFoundError: dev/staging/evaluations
> {code}
> This *does* work though when I list the blobs before passing them to 
> ds.dataset:
> {code:python}
> blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")
> dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs], 
>                      format="parquet", 
>                      partitioning="hive",
>                      exclude_invalid_files=False,
>                      filesystem=fs)
> {code}
> Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there 
> is no write_datasets):
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==1.0.1
> dataset = ds.dataset("dev/staging/evaluations", format="parquet", 
> filesystem=fs)
> dataset.to_table().to_pandas()
> {code}
> edit: 
> pyarrow 2.0.0
> fsspec 0.8.4
> adlfs v0.5.5
> pandas 1.1.4
> numpy 1.19.4
> azure.storage.blob 12.6.0
> {code:python}
> x = adlfs.AzureBlobFileSystem(account_name=name, account_key=key)
> type(x.find("dev/test", detail=True))
> list
> fs = fsspec.filesystem(protocol="abfs", account_name=name, account_key=key)
> type(fs.find("dev/test", detail=True))
> list
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to