[
https://issues.apache.org/jira/browse/ARROW-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236702#comment-17236702
]
Lance Dacey commented on ARROW-10517:
-------------------------------------
Regarding partition_filename_cb, some common ones I am using are to create a
full date name from the partition folders.
{code:java}
year=2020/month=8/day=4
partition_filename_cb=lambda x: "-".join(str(y).zfill(2) for y in x) +
".parquet"
2020-08-04.parquet{code}
I am doing this to address a "many small files" situation in a few scenarios.
Perhaps there is a better way to go about it though where this would not be
necessary.
Scenario 1):
* I use turbodbc to query 6 different SQL servers every 30 minutes (48
schedules per date * 6) directly into pyarrow tables which I then write to a
partitioned dataset.
* This creates a lot of small files which I then filter for and write to a
separate dataset with the partition_filename_cb to consolidate the data into a
single daily file
Scenario 2):
* I query for data every hour from some REST APIs (Zendesk and ServiceNow) for
any tickets which have changed since my last query (based on the latest
updated_at timestamp)
* I partition this data based on the created_at date. So we have a lot of
small files due to the frequency of downloads, and a single download might have
tickets which were created_at in the past. At least 24 files * the amount of
unique dates which were updated.
* So again, I filter for any created_at partition which was changed in the
last hour and then rewrite a "final" consolidated version of the data in a
separate dataset using the partition_filename_cb which is then used for
downstream tasks and transformation.
* Ultimately, I need to ensure that our visualizations/reports only display
the latest version of each ticket even if it was updated a dozen times, so this
step generally includes me sorting the data and dropping duplicates on some
unique constraints
Both scenarios have tiny files each download interval or based on how I
partition the data, but are pretty large overall (scenario 1 is over 500
million rows, and scenario 2 is over 70 million rows from March of this year).
Maybe it is not required to use the partition_filename_cb though, it just
seemed faster and more organized (under 300ms to read a single file compared to
over 1 minute to filter for a day with 96 UUID filenames)
Any best practices here to avoid the need to use the partition_filename_cb
function?
> [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob
> ------------------------------------------------------------------------
>
> Key: ARROW-10517
> URL: https://issues.apache.org/jira/browse/ARROW-10517
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Affects Versions: 2.0.0
> Environment: Ubuntu 18.04
> Reporter: Lance Dacey
> Priority: Major
> Labels: azureblob, dataset, dataset-parquet-read,
> dataset-parquet-write, fsspec
> Fix For: 2.0.0
>
> Attachments: ss.PNG, ss2.PNG
>
>
>
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==2.0.0
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> import pyarrow.dataset as ds
> import fsspec
> from pyarrow.dataset import DirectoryPartitioning
> fs = fsspec.filesystem(protocol='abfs',
> account_name=base.login,
> account_key=base.password)
> ds.write_dataset(data=table,
> base_dir="dev/test7",
> basename_template=None,
> format="parquet",
> partitioning=DirectoryPartitioning(pa.schema([("year",
> pa.string()), ("month", pa.string()), ("day", pa.string())])),
> schema=table.schema,
> filesystem=fs,
> )
> {code}
> I think this is due to early versions of adlfs having mkdir(). Although I
> use write_to_dataset and write_table all of the time, so I am not sure why
> this would be an issue.
> {code:python}
> ---------------------------------------------------------------------------
> RuntimeError Traceback (most recent call last)
> <ipython-input-40-bb38d83f896e> in <module>
> 13
> 14
> ---> 15 ds.write_dataset(data=table,
> 16 base_dir="dev/test7",
> 17 basename_template=None,
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in
> write_dataset(data, base_dir, basename_template, format, partitioning,
> schema, filesystem, file_options, use_threads)
> 771 filesystem, _ = _ensure_fs(filesystem)
> 772
> --> 773 _filesystemdataset_write(
> 774 data, base_dir, basename_template, schema,
> 775 filesystem, partitioning, file_options, use_threads,
> /opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in
> pyarrow._dataset._filesystemdataset_write()
> /opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in
> pyarrow._fs._cb_create_dir()
> /opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self,
> path, recursive)
> 226 def create_dir(self, path, recursive):
> 227 # mkdir also raises FileNotFoundError when base directory is
> not found
> --> 228 self.fs.mkdir(path, create_parents=recursive)
> 229
> 230 def delete_dir(self, path):
> /opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path,
> delimiter, exists_ok, **kwargs)
> 561 else:
> 562 ## everything else
> --> 563 raise RuntimeError(f"Cannot create
> {container_name}{delimiter}{path}.")
> 564 else:
> 565 if container_name in self.ls("") and path:
> RuntimeError: Cannot create dev/test7/2020/01/28.
> {code}
>
> Next, if I try to read a dataset (keep in mind that this works with
> read_table and ParquetDataset):
> {code:python}
> ds.dataset(source="dev/staging/evaluations",
> format="parquet",
> partitioning="hive",
> exclude_invalid_files=False,
> filesystem=fs
> )
> {code}
>
> This doesn't seem to respect the filesystem connected to Azure Blob.
> {code:python}
> ---------------------------------------------------------------------------
> FileNotFoundError Traceback (most recent call last)
> <ipython-input-41-4de65fe95db7> in <module>
> ----> 1 ds.dataset(source="dev/staging/evaluations",
> 2 format="parquet",
> 3 partitioning="hive",
> 4 exclude_invalid_files=False,
> 5 filesystem=fs
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source,
> schema, format, filesystem, partitioning, partition_base_dir,
> exclude_invalid_files, ignore_prefixes)
> 669 # TODO(kszucs): support InMemoryDataset for a table input
> 670 if _is_path_like(source):
> --> 671 return _filesystem_dataset(source, **kwargs)
> 672 elif isinstance(source, (tuple, list)):
> 673 if all(_is_path_like(elem) for elem in source):
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in
> _filesystem_dataset(source, schema, filesystem, partitioning, format,
> partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
> 426 fs, paths_or_selector = _ensure_multiple_sources(source,
> filesystem)
> 427 else:
> --> 428 fs, paths_or_selector = _ensure_single_source(source,
> filesystem)
> 429
> 430 options = FileSystemFactoryOptions(
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in
> _ensure_single_source(path, filesystem)
> 402 paths_or_selector = [path]
> 403 else:
> --> 404 raise FileNotFoundError(path)
> 405
> 406 return filesystem, paths_or_selector
> FileNotFoundError: dev/staging/evaluations
> {code}
> This *does* work though when I list the blobs before passing them to
> ds.dataset:
> {code:python}
> blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")
> dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs],
> format="parquet",
> partitioning="hive",
> exclude_invalid_files=False,
> filesystem=fs)
> {code}
> Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there
> is no write_datasets):
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==1.0.1
> dataset = ds.dataset("dev/staging/evaluations", format="parquet",
> filesystem=fs)
> dataset.to_table().to_pandas()
> {code}
> edit:
> pyarrow 2.0.0
> fsspec 0.8.4
> adlfs v0.5.5
> pandas 1.1.4
> numpy 1.19.4
> azure.storage.blob 12.6.0
> {code:python}
> x = adlfs.AzureBlobFileSystem(account_name=name, account_key=key)
> type(x.find("dev/test", detail=True))
> list
> fs = fsspec.filesystem(protocol="abfs", account_name=name, account_key=key)
> type(fs.find("dev/test", detail=True))
> list
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)