pitrou commented on a change in pull request #12530:
URL: https://github.com/apache/arrow/pull/12530#discussion_r833977881



##########
File path: cpp/src/arrow/dataset/partition.h
##########
@@ -76,7 +78,11 @@ class ARROW_DS_EXPORT Partitioning {
   /// \brief Parse a path into a partition expression
   virtual Result<compute::Expression> Parse(const std::string& path) const = 0;
 
-  virtual Result<std::string> Format(const compute::Expression& expr) const = 
0;
+  struct PartitionPathFormat {
+    std::string directory, prefix;
+  };
+
+  virtual Result<PartitionPathFormat> Format(const compute::Expression& expr) 
const = 0;

Review comment:
       This looks nice to me, perhaps you can add a short docstring too?

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -361,7 +412,32 @@ Result<std::string> DirectoryPartitioning::FormatValues(
     break;
   }
 
-  return fs::internal::JoinAbstractPath(std::move(segments));
+  return 
PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""};
+}
+
+Result<Partitioning::PartitionPathFormat> FilenamePartitioning::FormatValues(
+    const ScalarVector& values) const {
+  std::vector<std::string> 
segments(static_cast<size_t>(schema_->num_fields()));

Review comment:
       This method is all the same as `DirectoryPartitioning::FormatValues` 
except for the last line. Can you factor out the `segments` computation?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1592,6 +1593,145 @@ cdef class HivePartitioning(Partitioning):
             res.append(pyarrow_wrap_array(arr))
         return res
 
+cdef class FilenamePartitioning(Partitioning):
+    """
+    A Partitioning based on a specified Schema.
+
+    The FilenamePartitioning expects one segment in the file name for each
+    field in the schema (all fields are required to be present) separated
+    by '_'. For example given schema<year:int16, month:int8> the name
+    "2009_11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
+
+    Parameters
+    ----------
+    schema : Schema
+        The schema that describes the partitions present in the file path.
+    dictionaries : dict[str, Array]
+        If the type of any field of `schema` is a dictionary type, the
+        corresponding entry of `dictionaries` must be an array containing
+        every value which may be taken by the corresponding column or an
+        error will be raised in parsing.
+    segment_encoding : str, default "uri"
+        After splitting paths into segments, decode the segments. Valid
+        values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+    Returns
+    -------
+    FilenamePartitioning
+
+    Examples
+    --------
+    >>> from pyarrow.dataset import FilenamePartitioning
+    >>> partition = FilenamePartitioning(
+    ...     pa.schema([("year", pa.int16()), ("month", pa.int8())]))
+    >>> print(partitioning.parse("2009_11"))
+    ((year == 2009:int16) and (month == 11:int8))
+    """
+
+    cdef:
+        CFilenamePartitioning* filename_partitioning
+
+    def __init__(self, Schema schema not None, dictionaries=None,
+                 segment_encoding="uri"):
+        cdef:
+            shared_ptr[CFilenamePartitioning] c_partitioning
+            CKeyValuePartitioningOptions c_options
+
+        c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+        c_partitioning = make_shared[CFilenamePartitioning](
+            pyarrow_unwrap_schema(schema),
+            _partitioning_dictionaries(schema, dictionaries),
+            c_options,
+        )
+        self.init(<shared_ptr[CPartitioning]> c_partitioning)
+
+    cdef init(self, const shared_ptr[CPartitioning]& sp):
+        Partitioning.init(self, sp)
+        self.filename_partitioning = <CFilenamePartitioning*> sp.get()
+
+    @staticmethod
+    def discover(field_names=None, infer_dictionary=False,
+                 max_partition_dictionary_size=0,
+                 schema=None, segment_encoding="uri"):
+        """
+        Discover a FilenamePartitioning.
+
+        Parameters
+        ----------
+        field_names : list of str
+            The names to associate with the values from the subdirectory names.
+            If schema is given, will be populated from the schema.
+        infer_dictionary : bool, default False
+            When inferring a schema for partition fields, yield dictionary
+            encoded types instead of plain types. This can be more efficient
+            when materializing virtual columns, and Expressions parsed by the
+            finished Partitioning will include dictionaries of all unique
+            inspected values for each field.
+        max_partition_dictionary_size : int, default 0
+            Synonymous with infer_dictionary for backwards compatibility with
+            1.0: setting this to -1 or None is equivalent to passing
+            infer_dictionary=True.
+        schema : Schema, default None
+            Use this schema instead of inferring a schema from partition
+            values. Partition values will be validated against this schema
+            before accumulation into the Partitioning's dictionary.
+        segment_encoding : str, default "uri"
+            After splitting paths into segments, decode the segments. Valid
+            values are "uri" (URI-decode segments) and "none" (leave as-is).
+
+        Returns
+        -------
+        PartitioningFactory
+            To be used in the FileSystemFactoryOptions.
+        """
+        cdef:
+            CPartitioningFactoryOptions c_options
+            vector[c_string] c_field_names
+
+        if max_partition_dictionary_size in {-1, None}:
+            infer_dictionary = True
+        elif max_partition_dictionary_size != 0:
+            raise NotImplementedError("max_partition_dictionary_size must be "
+                                      "0, -1, or None")
+
+        if infer_dictionary:
+            c_options.infer_dictionary = True
+
+        if schema:
+            c_options.schema = pyarrow_unwrap_schema(schema)
+            c_field_names = [tobytes(f.name) for f in schema]
+        elif not field_names:
+            raise ValueError(
+                "Neither field_names nor schema was passed; "
+                "cannot infer field_names")
+        else:
+            c_field_names = [tobytes(s) for s in field_names]
+
+        c_options.segment_encoding = _get_segment_encoding(segment_encoding)
+
+        return PartitioningFactory.wrap(
+            CFilenamePartitioning.MakeFactory(c_field_names, c_options))
+
+    @property
+    def dictionaries(self):

Review comment:
       Well, you could add an intermediate class for it in the hierarchy :-)

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -561,6 +636,74 @@ class DirectoryPartitioningFactory : public 
KeyValuePartitioningFactory {
   std::vector<std::string> field_names_;
 };
 
+class FilenamePartitioningFactory : public KeyValuePartitioningFactory {
+ public:
+  FilenamePartitioningFactory(std::vector<std::string> field_names,
+                              PartitioningFactoryOptions options)
+      : KeyValuePartitioningFactory(options), 
field_names_(std::move(field_names)) {
+    Reset();
+    util::InitializeUTF8();
+  }
+
+  std::string type_name() const override { return "filename"; }
+
+  Result<std::shared_ptr<Schema>> Inspect(
+      const std::vector<std::string>& paths) override {
+    for (const auto& path : paths) {
+      size_t field_index = 0;

Review comment:
       Oh, I see this is essentially copy-pasted from 
`DirectoryPartitioningFactory::Inspect`. Is it possible to factor this out to 
avoid significant repetitions of code?

##########
File path: python/pyarrow/dataset.py
##########
@@ -117,6 +118,11 @@ def partitioning(schema=None, field_names=None, 
flavor=None,
       For example, given schema<year:int16, month:int8, day:int8>, a possible
       path would be "/year=2009/month=11/day=15" (but the field order does not
       need to match).
+    - "FilenamePartitioning": this scheme expects the partitions will have
+      filenames containing the field values separated by "_".
+      For example, given schema<year:int16, month:int8, day:int8>, a possible
+      partition filename "2009_11_part-0.parquet" would be parsed
+      to ("year"_ == 2009 and "month"_ == 11).

Review comment:
       Can you also update the description of the "flavor" parameter below?

##########
File path: cpp/src/arrow/dataset/partition.h
##########
@@ -321,6 +330,30 @@ class ARROW_DS_EXPORT FunctionPartitioning : public 
Partitioning {
   std::string name_;
 };
 
+class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning {
+ public:
+  /// If a field in schema is of dictionary type, the corresponding element of
+  /// dictionaries must be contain the dictionary of values for that field.

Review comment:
       It seems you didn't commit the change, though?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1440,7 +1441,6 @@ cdef class DirectoryPartitioning(Partitioning):
             res.append(pyarrow_wrap_array(arr))
         return res
 
-

Review comment:
       I think we need to keep two spaces between class definitions as per [PEP 
8](https://peps.python.org/pep-0008/#blank-lines).

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -329,6 +342,44 @@ Result<std::vector<KeyValuePartitioning::Key>> 
DirectoryPartitioning::ParseKeys(
   return keys;
 }
 
+FilenamePartitioning::FilenamePartitioning(std::shared_ptr<Schema> schema,
+                                           ArrayVector dictionaries,
+                                           KeyValuePartitioningOptions options)
+    : KeyValuePartitioning(std::move(schema), std::move(dictionaries), 
options) {
+  util::InitializeUTF8();
+}
+
+Result<std::vector<KeyValuePartitioning::Key>> FilenamePartitioning::ParseKeys(
+    const std::string& path) const {
+  std::vector<Key> keys;

Review comment:
       Tiny improvement:
   ```suggestion
     std::vector<Key> keys;
     keys.reserve(schema_.num_fields());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to