pitrou commented on code in PR #13830:
URL: https://github.com/apache/arrow/pull/13830#discussion_r954773835


##########
cpp/src/arrow/dataset/file_base.h:
##########
@@ -211,8 +221,12 @@ class ARROW_DS_EXPORT FileFragment : public Fragment,
 
   FileSource source_;
   std::shared_ptr<FileFormat> format_;
-
   friend class FileFormat;
+
+  // we do not want ParquetFileFragment to inherit these things.

Review Comment:
   Er... if we do not expect inheritance, we should probably put in a derived 
class?



##########
cpp/src/arrow/dataset/file_base.h:
##########
@@ -196,6 +200,12 @@ class ARROW_DS_EXPORT FileFragment : public Fragment,
 
   const FileSource& source() const { return source_; }
   const std::shared_ptr<FileFormat>& format() const { return format_; }
+  const int64_t start_byte() const { return start_byte_; }
+  const int64_t end_byte() const { return end_byte_; }
+  void set_bounds(int64_t start, int64_t end) {

Review Comment:
   Why not let this configured as part of the `CsvFragmentScanOptions` or 
something?



##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -177,20 +177,15 @@ static inline Result<csv::ReadOptions> GetReadOptions(
 }
 
 static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
-    const FileSource& source, const CsvFileFormat& format,
-    const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
+    const std::shared_ptr<io::BufferedInputStream>& input, const std::string& 
path,

Review Comment:
   Why not take `InputStream` instead?



##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -579,6 +579,11 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
       row_groups_(std::move(row_groups)) {}
 
+void ParquetFileFragment::set_bounds(int64_t start, int64_t end) {
+  ARROW_LOG(WARNING) << "Setting byte bounds for a ParquetFileFragment not 
supported! "
+                        "Use subset instead.";

Review Comment:
   If we want to convey an error, we should not emit a warning message but 
return an error status.
   Conversely, if we mean this to actually succeed (and be a no-op) on Parquet 
file fragments, then the warning message is merely distracting.



##########
cpp/src/arrow/dataset/file_base.h:
##########
@@ -211,8 +221,12 @@ class ARROW_DS_EXPORT FileFragment : public Fragment,
 
   FileSource source_;
   std::shared_ptr<FileFormat> format_;
-
   friend class FileFormat;
+
+  // we do not want ParquetFileFragment to inherit these things.
+ private:
+  int64_t start_byte_ = 0;
+  int64_t end_byte_ = 0;

Review Comment:
   Does this mean an empty range? It seems a bit confusing otherwise (should 
`util::optional` be used perhaps?)



##########
python/pyarrow/_dataset_parquet.pyx:
##########
@@ -317,6 +317,13 @@ cdef class ParquetFileFragment(FileFragment):
             row_groups
         )
 
+    def slice(self, start, end):
+        """
+        Slice is not implemented for Parquet files.
+        """
+        raise Exception("Not Implemented! You cannot slice a Parquet file by 
byte range.

Review Comment:
   `NotImplementedError` is what you are looking for :-)
   https://docs.python.org/3/library/exceptions.html#NotImplementedError



##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -89,6 +89,28 @@ Result<std::shared_ptr<io::InputStream>> 
FileSource::OpenCompressed(
   return io::CompressedInputStream::Make(codec.get(), std::move(file));
 }
 
+Result<std::shared_ptr<io::InputStream>> FileSource::OpenRange(int64_t start,
+                                                               int64_t end) 
const {
+  ARROW_ASSIGN_OR_RAISE(auto file, Open());
+
+  auto actual_compression = Compression::type::UNCOMPRESSED;
+
+  auto extension = fs::internal::GetAbstractPathExtension(path());
+  if (extension == "gz") {

Review Comment:
   I'm curious, why is this special case needed?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -1046,6 +1046,21 @@ cdef class FileFragment(Fragment):
             self.partition_expression
         )
 
+    def slice(self, start, end):
+        """
+        Returns a new FileFragment object that will only read 
+        a slice of the old FileFragment defined by start (start byte)
+        and end (end byte).
+        """
+
+        cdef FileFragment new_fragment = self.format.make_fragment(
+            self.path if self.buffer is None else self.buffer,
+            self.filesystem,
+            self.partition_expression
+        )
+        new_fragment.file_fragment.set_bounds(start, end)

Review Comment:
   Hmm... what happens if I call slice() several times (e.g. `fragment.slice(1, 
10).slice(2, 11)`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to