This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new aaf467a ARROW-9644: [C++][Dataset] Don't apply ignore_prefixes to
partition base_dir
aaf467a is described below
commit aaf467a6aa47f61a32b75e9e337434b300883d04
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Aug 13 16:18:55 2020 +0200
ARROW-9644: [C++][Dataset] Don't apply ignore_prefixes to partition base_dir
I still apply ignore_prefixes to all segments of paths yielded by a
selector which lie *outside* an explicit partition base directory.
Closes #7907 from bkietz/9644-ignore_prefixes-base_dir
Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/dataset/discovery.cc | 12 +++++++--
cpp/src/arrow/dataset/discovery_test.cc | 25 +++++++++++++++++-
cpp/src/arrow/python/filesystem.cc | 10 +++++++
cpp/src/arrow/python/filesystem.h | 5 ++++
python/pyarrow/_fs.pyx | 46 +++++++++++++++++++++++----------
python/pyarrow/dataset.py | 10 +++----
python/pyarrow/fs.py | 4 ++-
python/pyarrow/includes/libarrow_fs.pxd | 2 ++
python/pyarrow/tests/test_dataset.py | 6 ++---
python/pyarrow/tests/test_fs.py | 12 +++++++++
python/pyarrow/tests/test_parquet.py | 5 +++-
11 files changed, 110 insertions(+), 27 deletions(-)
diff --git a/cpp/src/arrow/dataset/discovery.cc
b/cpp/src/arrow/dataset/discovery.cc
index c082eaa..080703d 100644
--- a/cpp/src/arrow/dataset/discovery.cc
+++ b/cpp/src/arrow/dataset/discovery.cc
@@ -30,6 +30,7 @@
#include "arrow/dataset/type_fwd.h"
#include "arrow/filesystem/path_forest.h"
#include "arrow/filesystem/path_util.h"
+#include "arrow/util/logging.h"
namespace arrow {
namespace dataset {
@@ -175,15 +176,22 @@ Result<std::shared_ptr<DatasetFactory>>
FileSystemDatasetFactory::Make(
options.partition_base_dir = selector.base_dir;
}
+ ARROW_ASSIGN_OR_RAISE(selector.base_dir,
filesystem->NormalizePath(selector.base_dir));
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));
// Filter out anything that's not a file or that's explicitly ignored
auto files_end =
std::remove_if(files.begin(), files.end(), [&](const fs::FileInfo& info)
{
- if (!info.IsFile() ||
- StartsWithAnyOf(info.path(), options.selector_ignore_prefixes)) {
+ if (!info.IsFile()) return true;
+
+ auto relative = fs::internal::RemoveAncestor(selector.base_dir,
info.path());
+ DCHECK(relative.has_value())
+ << "GetFileInfo() yielded path outside selector.base_dir";
+
+ if (StartsWithAnyOf(relative->to_string(),
options.selector_ignore_prefixes)) {
return true;
}
+
return false;
});
files.erase(files_end, files.end());
diff --git a/cpp/src/arrow/dataset/discovery_test.cc
b/cpp/src/arrow/dataset/discovery_test.cc
index a3f1379..bc330ed 100644
--- a/cpp/src/arrow/dataset/discovery_test.cc
+++ b/cpp/src/arrow/dataset/discovery_test.cc
@@ -220,6 +220,8 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) {
}
TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultPrefixes) {
+ // When constructing a factory from a FileSelector,
+ // `selector_ignore_prefixes` governs which files are filtered out.
selector_.recursive = true;
MakeFactory({
fs::File("."),
@@ -234,6 +236,8 @@ TEST_F(FileSystemDatasetFactoryTest,
OptionsIgnoredDefaultPrefixes) {
}
TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultExplicitFiles) {
+ // When constructing a factory from an explicit list of paths,
+ // `selector_ignore_prefixes` is ignored.
selector_.recursive = true;
std::vector<fs::FileInfo> ignored_by_default = {
fs::File(".ignored_by_default.parquet"),
@@ -266,7 +270,7 @@ TEST_F(FileSystemDatasetFactoryTest,
OptionsIgnoredCustomPrefixes) {
}
TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredNoPrefixes) {
- // ignore nothing
+ // Ignore nothing
selector_.recursive = true;
factory_options_.selector_ignore_prefixes = {};
MakeFactory({
@@ -282,6 +286,25 @@ TEST_F(FileSystemDatasetFactoryTest,
OptionsIgnoredNoPrefixes) {
"not_ignored_by_default_either/dat"});
}
+TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredPrefixesWithBaseDirectory) {
+ // ARROW-9644: the selector base_dir shouldn't be filtered out even if
matches
+ // `selector_ignore_prefixes`.
+ std::string dir = "_shouldnt_be_ignored/.dataset/";
+ selector_.base_dir = dir;
+ selector_.recursive = true;
+ MakeFactory({
+ fs::File(dir + "."),
+ fs::File(dir + "_"),
+ fs::File(dir + "_$folder$/dat"),
+ fs::File(dir + "_SUCCESS"),
+ fs::File(dir + "not_ignored_by_default"),
+ fs::File(dir + "not_ignored_by_default_either/dat"),
+ });
+
+ AssertFinishWithPaths(
+ {dir + "not_ignored_by_default", dir +
"not_ignored_by_default_either/dat"});
+}
+
TEST_F(FileSystemDatasetFactoryTest, Inspect) {
auto s = schema({field("f64", float64())});
format_ = std::make_shared<DummyFileFormat>(s);
diff --git a/cpp/src/arrow/python/filesystem.cc
b/cpp/src/arrow/python/filesystem.cc
index c68059d..8e8e8a6 100644
--- a/cpp/src/arrow/python/filesystem.cc
+++ b/cpp/src/arrow/python/filesystem.cc
@@ -191,6 +191,16 @@ Result<std::shared_ptr<io::OutputStream>>
PyFileSystem::OpenAppendStream(
return stream;
}
+Result<std::string> PyFileSystem::NormalizePath(std::string path) {
+ std::string normalized;
+ auto st = SafeCallIntoPython([&]() -> Status {
+ vtable_.normalize_path(handler_.obj(), path, &normalized);
+ return CheckPyError();
+ });
+ RETURN_NOT_OK(st);
+ return normalized;
+}
+
} // namespace fs
} // namespace py
} // namespace arrow
diff --git a/cpp/src/arrow/python/filesystem.h
b/cpp/src/arrow/python/filesystem.h
index 5d04bcc..f2d9c90 100644
--- a/cpp/src/arrow/python/filesystem.h
+++ b/cpp/src/arrow/python/filesystem.h
@@ -65,6 +65,9 @@ class ARROW_PYTHON_EXPORT PyFileSystemVtable {
std::function<void(PyObject*, const std::string& path,
std::shared_ptr<io::OutputStream>* out)>
open_append_stream;
+
+ std::function<void(PyObject*, const std::string& path, std::string* out)>
+ normalize_path;
};
class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem {
@@ -105,6 +108,8 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public
arrow::fs::FileSystem {
Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
const std::string& path) override;
+ Result<std::string> NormalizePath(std::string path) override;
+
PyObject* handler() const { return handler_.obj(); }
private:
diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx
index dbcc580..d435de1 100644
--- a/python/pyarrow/_fs.pyx
+++ b/python/pyarrow/_fs.pyx
@@ -62,20 +62,6 @@ cdef inline c_string _path_as_bytes(path) except *:
return tobytes(path)
-def _normalize_path(FileSystem filesystem, path):
- """
- Normalize path for the given filesystem.
-
- The default implementation of this method is a no-op, but subclasses
- may allow normalizing irregular path forms (such as Windows local paths).
- """
- cdef c_string c_path = _path_as_bytes(path)
- cdef c_string c_path_normalized
-
- c_path_normalized = GetResultValue(filesystem.fs.NormalizePath(c_path))
- return frombytes(c_path_normalized)
-
-
cdef object _wrap_file_type(CFileType ty):
return FileType(<int8_t> ty)
@@ -709,6 +695,27 @@ cdef class FileSystem(_Weakrefable):
stream, path=path, compression=compression, buffer_size=buffer_size
)
+ def normalize_path(self, path):
+ """
+ Normalize filesystem path.
+
+ Parameters
+ ----------
+ path : str
+ The path to normalize
+
+ Returns
+ -------
+ normalized_path : str
+ The normalized path
+ """
+ cdef:
+ c_string c_path = _path_as_bytes(path)
+ c_string c_path_normalized
+
+ c_path_normalized = GetResultValue(self.fs.NormalizePath(c_path))
+ return frombytes(c_path_normalized)
+
cdef class LocalFileSystem(FileSystem):
"""
@@ -851,6 +858,7 @@ cdef class PyFileSystem(FileSystem):
vtable.open_input_file = _cb_open_input_file
vtable.open_output_stream = _cb_open_output_stream
vtable.open_append_stream = _cb_open_append_stream
+ vtable.normalize_path = _cb_normalize_path
wrapped = CPyFileSystem.Make(handler, move(vtable))
self.init(<shared_ptr[CFileSystem]> wrapped)
@@ -963,6 +971,12 @@ class FileSystemHandler(ABC):
Implement PyFileSystem.open_append_stream(...).
"""
+ @abstractmethod
+ def normalize_path(self, path):
+ """
+ Implement PyFileSystem.normalize_path(...).
+ """
+
# Callback definitions for CPyFileSystemVtable
@@ -1058,3 +1072,7 @@ cdef void _cb_open_append_stream(handler, const c_string&
path,
raise TypeError("open_append_stream should have returned "
"a PyArrow file")
out[0] = (<NativeFile> stream).get_output_stream()
+
+cdef void _cb_normalize_path(handler, const c_string& path,
+ c_string* out) except *:
+ out[0] = tobytes(handler.normalize_path(frombytes(path)))
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index fd03aee..19d39f1 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -18,7 +18,7 @@
"""Dataset is currently unstable. APIs subject to change without notice."""
import pyarrow as pa
-from pyarrow.fs import _normalize_path, _MockFileSystem
+from pyarrow.fs import _MockFileSystem
from pyarrow.util import _stringify_path, _is_path_like
from pyarrow._dataset import ( # noqa
@@ -227,7 +227,7 @@ def _ensure_fs(fs_or_uri):
# component then it will be treated as a path prefix
filesystem, prefix = FileSystem.from_uri(fs_or_uri)
is_local = isinstance(filesystem, LocalFileSystem)
- prefix = _normalize_path(filesystem, prefix)
+ prefix = filesystem.normalize_path(prefix)
if prefix:
# validate that the prefix is pointing to a directory
prefix_info = filesystem.get_file_info([prefix])[0]
@@ -294,7 +294,7 @@ def _ensure_multiple_sources(paths, filesystem=None):
filesystem, is_local = _ensure_fs(filesystem)
# allow normalizing irregular paths such as Windows local paths
- paths = [_normalize_path(filesystem, _stringify_path(p)) for p in paths]
+ paths = [filesystem.normalize_path(_stringify_path(p)) for p in paths]
# validate that all of the paths are pointing to existing *files*
# possible improvement is to group the file_infos by type and raise for
@@ -384,7 +384,7 @@ def _ensure_single_source(path, filesystem=None):
filesystem, _ = _ensure_fs(filesystem)
# ensure that the path is normalized before passing to dataset discovery
- path = _normalize_path(filesystem, path)
+ path = filesystem.normalize_path(path)
# retrieve the file descriptor
if file_info is None:
@@ -502,7 +502,7 @@ def parquet_dataset(metadata_path, schema=None,
filesystem=None, format=None,
else:
filesystem, _ = _ensure_fs(filesystem)
- metadata_path = _normalize_path(filesystem, _stringify_path(metadata_path))
+ metadata_path = filesystem.normalize_path(_stringify_path(metadata_path))
options = ParquetFactoryOptions(
partition_base_dir=partition_base_dir,
partitioning=_ensure_partitioning(partitioning)
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index fae96c1..69404b2 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -27,7 +27,6 @@ from pyarrow._fs import ( # noqa
LocalFileSystem,
SubTreeFileSystem,
_MockFileSystem,
- _normalize_path,
FileSystemHandler,
PyFileSystem,
)
@@ -117,6 +116,9 @@ class FSSpecHandler(FileSystemHandler):
protocol = protocol[0]
return "fsspec+{0}".format(protocol)
+ def normalize_path(self, path):
+ return path
+
@staticmethod
def _create_file_info(path, info):
size = info["size"]
diff --git a/python/pyarrow/includes/libarrow_fs.pxd
b/python/pyarrow/includes/libarrow_fs.pxd
index ad18c7a..95fe6a3 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -209,6 +209,7 @@ ctypedef void CallbackOpenInputFile(object, const c_string&,
shared_ptr[CRandomAccessFile]*)
ctypedef void CallbackOpenOutputStream(object, const c_string&,
shared_ptr[COutputStream]*)
+ctypedef void CallbackNormalizePath(object, const c_string&, c_string*)
cdef extern from "arrow/python/filesystem.h" namespace "arrow::py::fs" nogil:
@@ -230,6 +231,7 @@ cdef extern from "arrow/python/filesystem.h" namespace
"arrow::py::fs" nogil:
function[CallbackOpenInputFile] open_input_file
function[CallbackOpenOutputStream] open_output_stream
function[CallbackOpenOutputStream] open_append_stream
+ function[CallbackNormalizePath] normalize_path
cdef cppclass CPyFileSystem "arrow::py::fs::PyFileSystem":
@staticmethod
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index 2313a06..1270614 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -127,18 +127,18 @@ def mockfs():
@pytest.fixture
def open_logging_fs(monkeypatch):
- from pyarrow.fs import PyFileSystem, LocalFileSystem, _normalize_path
+ from pyarrow.fs import PyFileSystem, LocalFileSystem
from .test_fs import ProxyHandler
localfs = LocalFileSystem()
def normalized(paths):
- return {_normalize_path(localfs, str(p)) for p in paths}
+ return {localfs.normalize_path(str(p)) for p in paths}
opened = set()
def open_input_file(self, path):
- path = _normalize_path(localfs, str(path))
+ path = localfs.normalize_path(str(path))
opened.add(path)
return self._fs.open_input_file(path)
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index 7d1b6b1..50657ea 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -48,6 +48,9 @@ class DummyHandler(FileSystemHandler):
def get_type_name(self):
return "dummy"
+ def normalize_path(self, path):
+ return path
+
def get_file_info(self, paths):
info = []
for path in paths:
@@ -151,6 +154,9 @@ class ProxyHandler(FileSystemHandler):
def get_type_name(self):
return "proxy::" + self._fs.type_name
+ def normalize_path(self, path):
+ return self._fs.normalize_path(path)
+
def get_file_info(self, paths):
return self._fs.get_file_info(paths)
@@ -577,6 +583,12 @@ def test_type_name():
assert fs.type_name == "mock"
+def test_normalize_path(fs):
+ # Trivial path names (without separators) should generally be
+ # already normalized. Just a sanity check.
+ assert fs.normalize_path("foo") == "foo"
+
+
def test_non_path_like_input_raises(fs):
class Path:
pass
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 07af08f..b87cd55 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -2662,7 +2662,7 @@ def test_ignore_hidden_files_underscore(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
@parametrize_legacy_dataset
@pytest.mark.parametrize('dir_prefix', ['_', '.'])
-def test_ignore_no_private_directories_path_list(
+def test_ignore_no_private_directories_in_base_path(
tempdir, dir_prefix, use_legacy_dataset
):
# ARROW-8427 - don't ignore explicitly listed files if parent directory
@@ -2674,7 +2674,10 @@ def test_ignore_no_private_directories_path_list(
file_nrows=5)
dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)
+ _assert_dataset_paths(dataset, paths, use_legacy_dataset)
+ # ARROW-9644 - don't ignore full directory with underscore in base path
+ dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
_assert_dataset_paths(dataset, paths, use_legacy_dataset)