This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf467a  ARROW-9644: [C++][Dataset] Don't apply ignore_prefixes to 
partition base_dir
aaf467a is described below

commit aaf467a6aa47f61a32b75e9e337434b300883d04
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Aug 13 16:18:55 2020 +0200

    ARROW-9644: [C++][Dataset] Don't apply ignore_prefixes to partition base_dir
    
    I still apply ignore_prefixes to all segments of paths yielded by a 
selector which lie *outside* an explicit partition base directory.
    
    Closes #7907 from bkietz/9644-ignore_prefixes-base_dir
    
    Lead-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/dataset/discovery.cc      | 12 +++++++--
 cpp/src/arrow/dataset/discovery_test.cc | 25 +++++++++++++++++-
 cpp/src/arrow/python/filesystem.cc      | 10 +++++++
 cpp/src/arrow/python/filesystem.h       |  5 ++++
 python/pyarrow/_fs.pyx                  | 46 +++++++++++++++++++++++----------
 python/pyarrow/dataset.py               | 10 +++----
 python/pyarrow/fs.py                    |  4 ++-
 python/pyarrow/includes/libarrow_fs.pxd |  2 ++
 python/pyarrow/tests/test_dataset.py    |  6 ++---
 python/pyarrow/tests/test_fs.py         | 12 +++++++++
 python/pyarrow/tests/test_parquet.py    |  5 +++-
 11 files changed, 110 insertions(+), 27 deletions(-)

diff --git a/cpp/src/arrow/dataset/discovery.cc 
b/cpp/src/arrow/dataset/discovery.cc
index c082eaa..080703d 100644
--- a/cpp/src/arrow/dataset/discovery.cc
+++ b/cpp/src/arrow/dataset/discovery.cc
@@ -30,6 +30,7 @@
 #include "arrow/dataset/type_fwd.h"
 #include "arrow/filesystem/path_forest.h"
 #include "arrow/filesystem/path_util.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 namespace dataset {
@@ -175,15 +176,22 @@ Result<std::shared_ptr<DatasetFactory>> 
FileSystemDatasetFactory::Make(
     options.partition_base_dir = selector.base_dir;
   }
 
+  ARROW_ASSIGN_OR_RAISE(selector.base_dir, 
filesystem->NormalizePath(selector.base_dir));
   ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));
 
   // Filter out anything that's not a file or that's explicitly ignored
   auto files_end =
       std::remove_if(files.begin(), files.end(), [&](const fs::FileInfo& info) 
{
-        if (!info.IsFile() ||
-            StartsWithAnyOf(info.path(), options.selector_ignore_prefixes)) {
+        if (!info.IsFile()) return true;
+
+        auto relative = fs::internal::RemoveAncestor(selector.base_dir, 
info.path());
+        DCHECK(relative.has_value())
+            << "GetFileInfo() yielded path outside selector.base_dir";
+
+        if (StartsWithAnyOf(relative->to_string(), 
options.selector_ignore_prefixes)) {
           return true;
         }
+
         return false;
       });
   files.erase(files_end, files.end());
diff --git a/cpp/src/arrow/dataset/discovery_test.cc 
b/cpp/src/arrow/dataset/discovery_test.cc
index a3f1379..bc330ed 100644
--- a/cpp/src/arrow/dataset/discovery_test.cc
+++ b/cpp/src/arrow/dataset/discovery_test.cc
@@ -220,6 +220,8 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) {
 }
 
 TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultPrefixes) {
+  // When constructing a factory from a FileSelector,
+  // `selector_ignore_prefixes` governs which files are filtered out.
   selector_.recursive = true;
   MakeFactory({
       fs::File("."),
@@ -234,6 +236,8 @@ TEST_F(FileSystemDatasetFactoryTest, 
OptionsIgnoredDefaultPrefixes) {
 }
 
 TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultExplicitFiles) {
+  // When constructing a factory from an explicit list of paths,
+  // `selector_ignore_prefixes` is ignored.
   selector_.recursive = true;
   std::vector<fs::FileInfo> ignored_by_default = {
       fs::File(".ignored_by_default.parquet"),
@@ -266,7 +270,7 @@ TEST_F(FileSystemDatasetFactoryTest, 
OptionsIgnoredCustomPrefixes) {
 }
 
 TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredNoPrefixes) {
-  // ignore nothing
+  // Ignore nothing
   selector_.recursive = true;
   factory_options_.selector_ignore_prefixes = {};
   MakeFactory({
@@ -282,6 +286,25 @@ TEST_F(FileSystemDatasetFactoryTest, 
OptionsIgnoredNoPrefixes) {
                          "not_ignored_by_default_either/dat"});
 }
 
+TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredPrefixesWithBaseDirectory) {
+  //  ARROW-9644: the selector base_dir shouldn't be filtered out even if 
matches
+  // `selector_ignore_prefixes`.
+  std::string dir = "_shouldnt_be_ignored/.dataset/";
+  selector_.base_dir = dir;
+  selector_.recursive = true;
+  MakeFactory({
+      fs::File(dir + "."),
+      fs::File(dir + "_"),
+      fs::File(dir + "_$folder$/dat"),
+      fs::File(dir + "_SUCCESS"),
+      fs::File(dir + "not_ignored_by_default"),
+      fs::File(dir + "not_ignored_by_default_either/dat"),
+  });
+
+  AssertFinishWithPaths(
+      {dir + "not_ignored_by_default", dir + 
"not_ignored_by_default_either/dat"});
+}
+
 TEST_F(FileSystemDatasetFactoryTest, Inspect) {
   auto s = schema({field("f64", float64())});
   format_ = std::make_shared<DummyFileFormat>(s);
diff --git a/cpp/src/arrow/python/filesystem.cc 
b/cpp/src/arrow/python/filesystem.cc
index c68059d..8e8e8a6 100644
--- a/cpp/src/arrow/python/filesystem.cc
+++ b/cpp/src/arrow/python/filesystem.cc
@@ -191,6 +191,16 @@ Result<std::shared_ptr<io::OutputStream>> 
PyFileSystem::OpenAppendStream(
   return stream;
 }
 
+Result<std::string> PyFileSystem::NormalizePath(std::string path) {
+  std::string normalized;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    vtable_.normalize_path(handler_.obj(), path, &normalized);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  return normalized;
+}
+
 }  // namespace fs
 }  // namespace py
 }  // namespace arrow
diff --git a/cpp/src/arrow/python/filesystem.h 
b/cpp/src/arrow/python/filesystem.h
index 5d04bcc..f2d9c90 100644
--- a/cpp/src/arrow/python/filesystem.h
+++ b/cpp/src/arrow/python/filesystem.h
@@ -65,6 +65,9 @@ class ARROW_PYTHON_EXPORT PyFileSystemVtable {
   std::function<void(PyObject*, const std::string& path,
                      std::shared_ptr<io::OutputStream>* out)>
       open_append_stream;
+
+  std::function<void(PyObject*, const std::string& path, std::string* out)>
+      normalize_path;
 };
 
 class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem {
@@ -105,6 +108,8 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public 
arrow::fs::FileSystem {
   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
       const std::string& path) override;
 
+  Result<std::string> NormalizePath(std::string path) override;
+
   PyObject* handler() const { return handler_.obj(); }
 
  private:
diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx
index dbcc580..d435de1 100644
--- a/python/pyarrow/_fs.pyx
+++ b/python/pyarrow/_fs.pyx
@@ -62,20 +62,6 @@ cdef inline c_string _path_as_bytes(path) except *:
     return tobytes(path)
 
 
-def _normalize_path(FileSystem filesystem, path):
-    """
-    Normalize path for the given filesystem.
-
-    The default implementation of this method is a no-op, but subclasses
-    may allow normalizing irregular path forms (such as Windows local paths).
-    """
-    cdef c_string c_path = _path_as_bytes(path)
-    cdef c_string c_path_normalized
-
-    c_path_normalized = GetResultValue(filesystem.fs.NormalizePath(c_path))
-    return frombytes(c_path_normalized)
-
-
 cdef object _wrap_file_type(CFileType ty):
     return FileType(<int8_t> ty)
 
@@ -709,6 +695,27 @@ cdef class FileSystem(_Weakrefable):
             stream, path=path, compression=compression, buffer_size=buffer_size
         )
 
+    def normalize_path(self, path):
+        """
+        Normalize filesystem path.
+
+        Parameters
+        ----------
+        path : str
+            The path to normalize
+
+        Returns
+        -------
+        normalized_path : str
+            The normalized path
+        """
+        cdef:
+            c_string c_path = _path_as_bytes(path)
+            c_string c_path_normalized
+
+        c_path_normalized = GetResultValue(self.fs.NormalizePath(c_path))
+        return frombytes(c_path_normalized)
+
 
 cdef class LocalFileSystem(FileSystem):
     """
@@ -851,6 +858,7 @@ cdef class PyFileSystem(FileSystem):
         vtable.open_input_file = _cb_open_input_file
         vtable.open_output_stream = _cb_open_output_stream
         vtable.open_append_stream = _cb_open_append_stream
+        vtable.normalize_path = _cb_normalize_path
 
         wrapped = CPyFileSystem.Make(handler, move(vtable))
         self.init(<shared_ptr[CFileSystem]> wrapped)
@@ -963,6 +971,12 @@ class FileSystemHandler(ABC):
         Implement PyFileSystem.open_append_stream(...).
         """
 
+    @abstractmethod
+    def normalize_path(self, path):
+        """
+        Implement PyFileSystem.normalize_path(...).
+        """
+
 
 # Callback definitions for CPyFileSystemVtable
 
@@ -1058,3 +1072,7 @@ cdef void _cb_open_append_stream(handler, const c_string& 
path,
         raise TypeError("open_append_stream should have returned "
                         "a PyArrow file")
     out[0] = (<NativeFile> stream).get_output_stream()
+
+cdef void _cb_normalize_path(handler, const c_string& path,
+                             c_string* out) except *:
+    out[0] = tobytes(handler.normalize_path(frombytes(path)))
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index fd03aee..19d39f1 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -18,7 +18,7 @@
 """Dataset is currently unstable. APIs subject to change without notice."""
 
 import pyarrow as pa
-from pyarrow.fs import _normalize_path, _MockFileSystem
+from pyarrow.fs import _MockFileSystem
 from pyarrow.util import _stringify_path, _is_path_like
 
 from pyarrow._dataset import (  # noqa
@@ -227,7 +227,7 @@ def _ensure_fs(fs_or_uri):
         # component then it will be treated as a path prefix
         filesystem, prefix = FileSystem.from_uri(fs_or_uri)
         is_local = isinstance(filesystem, LocalFileSystem)
-        prefix = _normalize_path(filesystem, prefix)
+        prefix = filesystem.normalize_path(prefix)
         if prefix:
             # validate that the prefix is pointing to a directory
             prefix_info = filesystem.get_file_info([prefix])[0]
@@ -294,7 +294,7 @@ def _ensure_multiple_sources(paths, filesystem=None):
     filesystem, is_local = _ensure_fs(filesystem)
 
     # allow normalizing irregular paths such as Windows local paths
-    paths = [_normalize_path(filesystem, _stringify_path(p)) for p in paths]
+    paths = [filesystem.normalize_path(_stringify_path(p)) for p in paths]
 
     # validate that all of the paths are pointing to existing *files*
     # possible improvement is to group the file_infos by type and raise for
@@ -384,7 +384,7 @@ def _ensure_single_source(path, filesystem=None):
     filesystem, _ = _ensure_fs(filesystem)
 
     # ensure that the path is normalized before passing to dataset discovery
-    path = _normalize_path(filesystem, path)
+    path = filesystem.normalize_path(path)
 
     # retrieve the file descriptor
     if file_info is None:
@@ -502,7 +502,7 @@ def parquet_dataset(metadata_path, schema=None, 
filesystem=None, format=None,
     else:
         filesystem, _ = _ensure_fs(filesystem)
 
-    metadata_path = _normalize_path(filesystem, _stringify_path(metadata_path))
+    metadata_path = filesystem.normalize_path(_stringify_path(metadata_path))
     options = ParquetFactoryOptions(
         partition_base_dir=partition_base_dir,
         partitioning=_ensure_partitioning(partitioning)
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index fae96c1..69404b2 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -27,7 +27,6 @@ from pyarrow._fs import (  # noqa
     LocalFileSystem,
     SubTreeFileSystem,
     _MockFileSystem,
-    _normalize_path,
     FileSystemHandler,
     PyFileSystem,
 )
@@ -117,6 +116,9 @@ class FSSpecHandler(FileSystemHandler):
             protocol = protocol[0]
         return "fsspec+{0}".format(protocol)
 
+    def normalize_path(self, path):
+        return path
+
     @staticmethod
     def _create_file_info(path, info):
         size = info["size"]
diff --git a/python/pyarrow/includes/libarrow_fs.pxd 
b/python/pyarrow/includes/libarrow_fs.pxd
index ad18c7a..95fe6a3 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -209,6 +209,7 @@ ctypedef void CallbackOpenInputFile(object, const c_string&,
                                     shared_ptr[CRandomAccessFile]*)
 ctypedef void CallbackOpenOutputStream(object, const c_string&,
                                        shared_ptr[COutputStream]*)
+ctypedef void CallbackNormalizePath(object, const c_string&, c_string*)
 
 cdef extern from "arrow/python/filesystem.h" namespace "arrow::py::fs" nogil:
 
@@ -230,6 +231,7 @@ cdef extern from "arrow/python/filesystem.h" namespace 
"arrow::py::fs" nogil:
         function[CallbackOpenInputFile] open_input_file
         function[CallbackOpenOutputStream] open_output_stream
         function[CallbackOpenOutputStream] open_append_stream
+        function[CallbackNormalizePath] normalize_path
 
     cdef cppclass CPyFileSystem "arrow::py::fs::PyFileSystem":
         @staticmethod
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index 2313a06..1270614 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -127,18 +127,18 @@ def mockfs():
 
 @pytest.fixture
 def open_logging_fs(monkeypatch):
-    from pyarrow.fs import PyFileSystem, LocalFileSystem, _normalize_path
+    from pyarrow.fs import PyFileSystem, LocalFileSystem
     from .test_fs import ProxyHandler
 
     localfs = LocalFileSystem()
 
     def normalized(paths):
-        return {_normalize_path(localfs, str(p)) for p in paths}
+        return {localfs.normalize_path(str(p)) for p in paths}
 
     opened = set()
 
     def open_input_file(self, path):
-        path = _normalize_path(localfs, str(path))
+        path = localfs.normalize_path(str(path))
         opened.add(path)
         return self._fs.open_input_file(path)
 
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index 7d1b6b1..50657ea 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -48,6 +48,9 @@ class DummyHandler(FileSystemHandler):
     def get_type_name(self):
         return "dummy"
 
+    def normalize_path(self, path):
+        return path
+
     def get_file_info(self, paths):
         info = []
         for path in paths:
@@ -151,6 +154,9 @@ class ProxyHandler(FileSystemHandler):
     def get_type_name(self):
         return "proxy::" + self._fs.type_name
 
+    def normalize_path(self, path):
+        return self._fs.normalize_path(path)
+
     def get_file_info(self, paths):
         return self._fs.get_file_info(paths)
 
@@ -577,6 +583,12 @@ def test_type_name():
     assert fs.type_name == "mock"
 
 
+def test_normalize_path(fs):
+    # Trivial path names (without separators) should generally be
+    # already normalized.  Just a sanity check.
+    assert fs.normalize_path("foo") == "foo"
+
+
 def test_non_path_like_input_raises(fs):
     class Path:
         pass
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 07af08f..b87cd55 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -2662,7 +2662,7 @@ def test_ignore_hidden_files_underscore(tempdir, 
use_legacy_dataset):
 @pytest.mark.pandas
 @parametrize_legacy_dataset
 @pytest.mark.parametrize('dir_prefix', ['_', '.'])
-def test_ignore_no_private_directories_path_list(
+def test_ignore_no_private_directories_in_base_path(
     tempdir, dir_prefix, use_legacy_dataset
 ):
     # ARROW-8427 - don't ignore explicitly listed files if parent directory
@@ -2674,7 +2674,10 @@ def test_ignore_no_private_directories_path_list(
                                             file_nrows=5)
 
     dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)
+    _assert_dataset_paths(dataset, paths, use_legacy_dataset)
 
+    # ARROW-9644 - don't ignore full directory with underscore in base path
+    dataset = pq.ParquetDataset(dirpath, use_legacy_dataset=use_legacy_dataset)
     _assert_dataset_paths(dataset, paths, use_legacy_dataset)
 
 

Reply via email to