[
https://issues.apache.org/jira/browse/ARROW-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235388#comment-17235388
]
Lance Dacey commented on ARROW-10517:
-------------------------------------
Latest adlfs (0.5.5):
This really creates the test.parquet file as well, not just the directory:
{code:java}
fs.mkdir("dev/test99999999999/2020/01/28/test.parquet", create_parents=True)
{code}
And if I try to run the same line again it it fails because the partition
exists:
{code:python}
---------------------------------------------------------------------------
StorageErrorException: Operation returned an invalid status 'The specified blob
already exists.'
During handling of the above exception, another exception occurred:
ResourceExistsError Traceback (most recent call last)
/c/airflow/test.py in <module>
----> 6 fs.mkdir("dev/test99999999999/2020/01/28/test.parquet",
create_parents=True)
~/miniconda3/envs/airflow/lib/python3.8/site-packages/adlfs/spec.py in
mkdir(self, path, delimiter, exist_ok, **kwargs)
880
881 def mkdir(self, path, delimiter="/", exist_ok=False, **kwargs):
--> 882 maybe_sync(self._mkdir, self, path, delimiter, exist_ok)
883
884 async def _mkdir(self, path, delimiter="/", exist_ok=False,
**kwargs):
~/miniconda3/envs/airflow/lib/python3.8/site-packages/fsspec/asyn.py in
maybe_sync(func, self, *args, **kwargs)
98 if inspect.iscoroutinefunction(func):
99 # run the awaitable on the loop
--> 100 return sync(loop, func, *args, **kwargs)
101 else:
102 # just call the blocking function
~/miniconda3/envs/airflow/lib/python3.8/site-packages/fsspec/asyn.py in
sync(loop, func, callback_timeout, *args, **kwargs)
69 if error[0]:
70 typ, exc, tb = error[0]
---> 71 raise exc.with_traceback(tb)
72 else:
73 return result[0]
~/miniconda3/envs/airflow/lib/python3.8/site-packages/fsspec/asyn.py in f()
53 if callback_timeout is not None:
54 future = asyncio.wait_for(future, callback_timeout)
---> 55 result[0] = await future
56 except Exception:
57 error[0] = sys.exc_info()
~/miniconda3/envs/airflow/lib/python3.8/site-packages/adlfs/spec.py in
_mkdir(self, path, delimiter, exist_ok, **kwargs)
918 container=container_name
919 )
--> 920 await container_client.upload_blob(name=path, data="")
921 else:
922 ## everything else
~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py
in wrapper_use_tracer(*args, **kwargs)
72 span_impl_type = settings.tracing_implementation()
73 if span_impl_type is None:
---> 74 return await func(*args, **kwargs)
75
76 # Merge span is parameter is set, but only if no explicit
parent are passed
~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/aio/_container_client_async.py
in upload_blob(self, name, data, blob_type, length, metadata, **kwargs)
715 timeout = kwargs.pop('timeout', None)
716 encoding = kwargs.pop('encoding', 'UTF-8')
--> 717 await blob.upload_blob(
718 data,
719 blob_type=blob_type,
~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py
in wrapper_use_tracer(*args, **kwargs)
72 span_impl_type = settings.tracing_implementation()
73 if span_impl_type is None:
---> 74 return await func(*args, **kwargs)
75
76 # Merge span is parameter is set, but only if no explicit
parent are passed
~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py
in upload_blob(self, data, blob_type, length, metadata, **kwargs)
267 **kwargs)
268 if blob_type == BlobType.BlockBlob:
--> 269 return await upload_block_blob(**options)
270 if blob_type == BlobType.PageBlob:
271 return await upload_page_blob(**options)
~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/aio/_upload_helpers.py
in upload_block_blob(client, data, stream, length, overwrite, headers,
validate_content, max_concurrency, blob_settings, encryption_options, **kwargs)
131 except StorageErrorException as error:
132 try:
--> 133 process_storage_error(error)
134 except ResourceModifiedError as mod_error:
135 if not overwrite:
~/miniconda3/envs/airflow/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py
in process_storage_error(storage_error)
145 error.error_code = error_code
146 error.additional_info = additional_data
--> 147 raise error
148
149
ResourceExistsError: The specified blob already exists.
RequestId:85acda2e-401e-0080-5166-be4d32000000
Time:2020-11-19T11:23:28.7193393Z
ErrorCode:BlobAlreadyExists
Error:None
{code}
If I switch to adlfs 0.2.5 (old version which works for ds.dataset()), there
error is different when I try to create a directory which already exists but I
also cannot create any directory at all for some reason. I also tried to create
an entirely new directory which definitely does not exist and ran into an error:
{code:python}
fs.mkdir("dev/testab1234123/2020/01/28/new.parquet", create_parents=True)
RuntimeError: Cannot create dev/testab1234123/2020/01/28/new.parquet.
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
----> 6 fs.mkdir("dev/test99999999999/2020/01/28/test.parquet",
create_parents=True)
~/miniconda3/envs/old/lib/python3.8/site-packages/adlfs/core.py in mkdir(self,
path, delimiter, exists_ok, **kwargs)
561 else:
562 ## everything else
--> 563 raise RuntimeError(f"Cannot create
{container_name}{delimiter}{path}.")
564 else:
565 if container_name in self.ls("") and path:
RuntimeError: Cannot create dev/test99999999999/2020/01/28/test.parquet.
{code}
But I *am* able to read a dataset which I could *not* do with adlfs 0.5.5 (I
get that error about a list of files instead of a dictionary using fs.find()
with the latest version).
So this is bizarre. I can only read data (with ds.dataset()) with an old
version of adlfs, and I can only write data with the newest version.
Even pq.read_table() will not work for me using the latest version of adlfs
(0.5.5):
{code:python}
----> 7 table = pq.read_table(source="dev/testing10/evaluations", columns=None,
filters=[('year', '==', '2020')], filesystem=fs)
~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/parquet.py in
read_table(source, columns, use_threads, metadata, use_pandas_metadata,
memory_map, read_dictionary, filesystem, filters, buffer_size, partitioning,
use_legacy_dataset, ignore_prefixes)
1605 )
1606 try:
-> 1607 dataset = _ParquetDatasetV2(
1608 source,
1609 filesystem=filesystem,
~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/parquet.py in
__init__(self, path_or_paths, filesystem, filters, partitioning,
read_dictionary, buffer_size, memory_map, ignore_prefixes, **kwargs)
1465 infer_dictionary=True)
1466
-> 1467 self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
1468 format=parquet_format,
1469 partitioning=partitioning,
~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/dataset.py in
dataset(source, schema, format, filesystem, partitioning, partition_base_dir,
exclude_invalid_files, ignore_prefixes)
669 # TODO(kszucs): support InMemoryDataset for a table input
670 if _is_path_like(source):
--> 671 return _filesystem_dataset(source, **kwargs)
672 elif isinstance(source, (tuple, list)):
673 if all(_is_path_like(elem) for elem in source):
~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/dataset.py in
_filesystem_dataset(source, schema, filesystem, partitioning, format,
partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
426 fs, paths_or_selector = _ensure_multiple_sources(source,
filesystem)
427 else:
--> 428 fs, paths_or_selector = _ensure_single_source(source,
filesystem)
429
430 options = FileSystemFactoryOptions(
~/miniconda3/envs/airflow/lib/python3.8/site-packages/pyarrow/dataset.py in
_ensure_single_source(path, filesystem)
402 paths_or_selector = [path]
403 else:
--> 404 raise FileNotFoundError(path)
405
406 return filesystem, paths_or_selector
FileNotFoundError: dev/testing10/evaluations
{code}
If I turn on use_legacy_dataset=True, then it works though and I am able to use
the write_dataset feature. So this is definitely some interaction between the
new dataset module and adlfs.
{code:python}
table = pq.read_table(source="dev/testing10/evaluations", columns=None,
filters=[('year', '==', '2020')], filesystem=fs, use_legacy_dataset=True)
ds.write_dataset(table,
base_dir="dev/adlfs-0.5.5",
format="parquet",
partitioning=ds.DirectoryPartitioning(pa.schema([("year",
pa.int64()), ("month", pa.string()), ("day", pa.string())])),
schema=table.schema,
filesystem=fs)
{code}
This does seem to create empty files for each partition as well, which is
strange, and the files are named with a part- prefix now instead of a UUID.
dev/adlfs-0.5.5/2020/11/15/part-2.parquet
dev/adlfs-0.5.5/2020/11/16/part-3.parquet
!ss2.PNG!
> [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob
> ------------------------------------------------------------------------
>
> Key: ARROW-10517
> URL: https://issues.apache.org/jira/browse/ARROW-10517
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Affects Versions: 2.0.0
> Environment: Ubuntu 18.04
> Reporter: Lance Dacey
> Priority: Major
> Labels: azureblob, dataset, dataset-parquet-read,
> dataset-parquet-write, fsspec
> Attachments: ss.PNG, ss2.PNG
>
>
>
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==2.0.0
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> import pyarrow.dataset as ds
> import fsspec
> from pyarrow.dataset import DirectoryPartitioning
> fs = fsspec.filesystem(protocol='abfs',
> account_name=base.login,
> account_key=base.password)
> ds.write_dataset(data=table,
> base_dir="dev/test7",
> basename_template=None,
> format="parquet",
> partitioning=DirectoryPartitioning(pa.schema([("year",
> pa.string()), ("month", pa.string()), ("day", pa.string())])),
> schema=table.schema,
> filesystem=fs,
> )
> {code}
> I think this is due to early versions of adlfs having mkdir(). Although I
> use write_to_dataset and write_table all of the time, so I am not sure why
> this would be an issue.
> {code:python}
> ---------------------------------------------------------------------------
> RuntimeError Traceback (most recent call last)
> <ipython-input-40-bb38d83f896e> in <module>
> 13
> 14
> ---> 15 ds.write_dataset(data=table,
> 16 base_dir="dev/test7",
> 17 basename_template=None,
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in
> write_dataset(data, base_dir, basename_template, format, partitioning,
> schema, filesystem, file_options, use_threads)
> 771 filesystem, _ = _ensure_fs(filesystem)
> 772
> --> 773 _filesystemdataset_write(
> 774 data, base_dir, basename_template, schema,
> 775 filesystem, partitioning, file_options, use_threads,
> /opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in
> pyarrow._dataset._filesystemdataset_write()
> /opt/conda/lib/python3.8/site-packages/pyarrow/_fs.pyx in
> pyarrow._fs._cb_create_dir()
> /opt/conda/lib/python3.8/site-packages/pyarrow/fs.py in create_dir(self,
> path, recursive)
> 226 def create_dir(self, path, recursive):
> 227 # mkdir also raises FileNotFoundError when base directory is
> not found
> --> 228 self.fs.mkdir(path, create_parents=recursive)
> 229
> 230 def delete_dir(self, path):
> /opt/conda/lib/python3.8/site-packages/adlfs/core.py in mkdir(self, path,
> delimiter, exists_ok, **kwargs)
> 561 else:
> 562 ## everything else
> --> 563 raise RuntimeError(f"Cannot create
> {container_name}{delimiter}{path}.")
> 564 else:
> 565 if container_name in self.ls("") and path:
> RuntimeError: Cannot create dev/test7/2020/01/28.
> {code}
>
> Next, if I try to read a dataset (keep in mind that this works with
> read_table and ParquetDataset):
> {code:python}
> ds.dataset(source="dev/staging/evaluations",
> format="parquet",
> partitioning="hive",
> exclude_invalid_files=False,
> filesystem=fs
> )
> {code}
>
> This doesn't seem to respect the filesystem connected to Azure Blob.
> {code:python}
> ---------------------------------------------------------------------------
> FileNotFoundError Traceback (most recent call last)
> <ipython-input-41-4de65fe95db7> in <module>
> ----> 1 ds.dataset(source="dev/staging/evaluations",
> 2 format="parquet",
> 3 partitioning="hive",
> 4 exclude_invalid_files=False,
> 5 filesystem=fs
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source,
> schema, format, filesystem, partitioning, partition_base_dir,
> exclude_invalid_files, ignore_prefixes)
> 669 # TODO(kszucs): support InMemoryDataset for a table input
> 670 if _is_path_like(source):
> --> 671 return _filesystem_dataset(source, **kwargs)
> 672 elif isinstance(source, (tuple, list)):
> 673 if all(_is_path_like(elem) for elem in source):
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in
> _filesystem_dataset(source, schema, filesystem, partitioning, format,
> partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
> 426 fs, paths_or_selector = _ensure_multiple_sources(source,
> filesystem)
> 427 else:
> --> 428 fs, paths_or_selector = _ensure_single_source(source,
> filesystem)
> 429
> 430 options = FileSystemFactoryOptions(
> /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in
> _ensure_single_source(path, filesystem)
> 402 paths_or_selector = [path]
> 403 else:
> --> 404 raise FileNotFoundError(path)
> 405
> 406 return filesystem, paths_or_selector
> FileNotFoundError: dev/staging/evaluations
> {code}
> This *does* work though when I list the blobs before passing them to
> ds.dataset:
> {code:python}
> blobs = wasb.list_blobs(container_name="dev", prefix="staging/evaluations")
> dataset = ds.dataset(source=["dev/" + blob.name for blob in blobs],
> format="parquet",
> partitioning="hive",
> exclude_invalid_files=False,
> filesystem=fs)
> {code}
> Next, if I downgrade to pyarrow 1.0.1, I am able to read datasets (but there
> is no write_datasets):
> {code:python}
> # adal==1.2.5
> # adlfs==0.2.5
> # azure-storage-blob==2.1.0
> # azure-storage-common==2.1.0
> # fsspec==0.7.4
> # pandas==1.1.3
> # pyarrow==1.0.1
> dataset = ds.dataset("dev/staging/evaluations", format="parquet",
> filesystem=fs)
> dataset.to_table().to_pandas()
> {code}
> edit:
> pyarrow 2.0.0
> fsspec 0.8.4
> adlfs v0.5.5
> pandas 1.1.4
> numpy 1.19.4
> azure.storage.blob 12.6.0
> {code:python}
> x = adlfs.AzureBlobFileSystem(account_name=name, account_key=key)
> type(x.find("dev/test", detail=True))
> list
> fs = fsspec.filesystem(protocol="abfs", account_name=name, account_key=key)
> type(fs.find("dev/test", detail=True))
> list
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)