This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7dca6  ARROW-8074: [C++][Dataset][Python] FileFragments from buffers 
and NativeFiles
9d7dca6 is described below

commit 9d7dca643fe60b1131c62646f4a3849262bb46b9
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Jun 18 14:08:32 2020 +0200

    ARROW-8074: [C++][Dataset][Python] FileFragments from buffers and 
NativeFiles
    
    Adds `ds.FileSource`, which represents an openable file and may be 
initialized from a `path, filesystem`, a `Buffer`, or any python object which 
can be wrapped by `NativeFile`.
    
    `test_parquet.py` now uses `BytesIO` as the roundtrip medium for non legacy 
`ParquetDataset` instead of resorting to a mock filesystem. Other than that the 
integration with Python is somewhat haphazard; I'm thinking we need to rewrite 
some of the APIs to be less magical about figuring out what is a selector, 
path, list(paths), etc since we will be adding buffers and `NativeFile`s to the 
mix.
    
    Closes #7156 from bkietz/8047-FileFragments-from-NativeFile
    
    Lead-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 cpp/examples/arrow/dataset-parquet-scan-example.cc |   6 +-
 cpp/src/arrow/dataset/file_base.cc                 |  39 ++--
 cpp/src/arrow/dataset/file_base.h                  | 140 +++++++++------
 cpp/src/arrow/dataset/file_ipc.cc                  |   6 +-
 cpp/src/arrow/dataset/file_ipc.h                   |   2 +-
 cpp/src/arrow/dataset/file_ipc_test.cc             |   4 +-
 cpp/src/arrow/dataset/file_test.cc                 |  14 +-
 cpp/src/arrow/flight/client.cc                     |  12 +-
 cpp/src/arrow/python/common.cc                     |   7 -
 cpp/src/arrow/python/common.h                      |   6 +-
 cpp/src/arrow/python/numpy_convert.cc              |   4 +-
 cpp/src/arrow/python/serialize.cc                  |   2 +-
 cpp/src/arrow/result.h                             |   7 +-
 cpp/src/arrow/status.h                             |   5 +-
 cpp/src/arrow/util/checked_cast.h                  |   7 +-
 dev/archery/archery/cli.py                         |   2 +-
 python/pyarrow/_dataset.pyx                        | 199 +++++++++++++--------
 python/pyarrow/includes/common.pxd                 |   5 +
 python/pyarrow/includes/libarrow_dataset.pxd       |   6 +-
 python/pyarrow/io.pxi                              |   6 +-
 python/pyarrow/lib.pxd                             |   1 +
 python/pyarrow/parquet.py                          |  57 +++---
 python/pyarrow/tests/test_parquet.py               |  60 +++----
 23 files changed, 340 insertions(+), 257 deletions(-)

diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc 
b/cpp/examples/arrow/dataset-parquet-scan-example.cc
index 40e1556..ed4b89d 100644
--- a/cpp/examples/arrow/dataset-parquet-scan-example.cc
+++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc
@@ -109,7 +109,8 @@ std::shared_ptr<ds::Dataset> GetDatasetFromFile(
     std::string file) {
   ds::FileSystemFactoryOptions options;
   // The factory will try to build a child dataset.
-  auto factory = ds::FileSystemDatasetFactory::Make(fs, {file}, format, 
options).ValueOrDie();
+  auto factory =
+      ds::FileSystemDatasetFactory::Make(fs, {file}, format, 
options).ValueOrDie();
 
   // Try to infer a common schema for all files.
   auto schema = factory->Inspect(conf.inspect_options).ValueOrDie();
@@ -117,7 +118,8 @@ std::shared_ptr<ds::Dataset> GetDatasetFromFile(
   // with the previous one, e.g. `factory->Finish(compatible_schema)`.
   auto child = factory->Finish(conf.finish_options).ValueOrDie();
 
-  ds::DatasetVector children{conf.repeat, child};
+  ds::DatasetVector children;
+  children.resize(conf.repeat, child);
   auto dataset = ds::UnionDataset::Make(std::move(schema), 
std::move(children));
 
   return dataset.ValueOrDie();
diff --git a/cpp/src/arrow/dataset/file_base.cc 
b/cpp/src/arrow/dataset/file_base.cc
index 8f663bb..a39289d 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -35,24 +35,23 @@ namespace arrow {
 namespace dataset {
 
 Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
-  if (id() == PATH) {
-    return filesystem()->OpenInputFile(path());
+  if (filesystem_) {
+    return filesystem_->OpenInputFile(path_);
   }
 
-  return std::make_shared<::arrow::io::BufferReader>(buffer());
-}
-
-Result<std::shared_ptr<arrow::io::OutputStream>> FileSource::OpenWritable() 
const {
-  if (!writable_) {
-    return Status::Invalid("file source '", path(), "' is not writable");
+  if (buffer_) {
+    return std::make_shared<::arrow::io::BufferReader>(buffer_);
   }
 
-  if (id() == PATH) {
-    return filesystem()->OpenOutputStream(path());
+  return custom_open_();
+}
+
+Result<std::shared_ptr<arrow::io::OutputStream>> WritableFileSource::Open() 
const {
+  if (filesystem_) {
+    return filesystem_->OpenOutputStream(path_);
   }
 
-  auto b = internal::checked_pointer_cast<ResizableBuffer>(buffer());
-  return std::make_shared<::arrow::io::BufferOutputStream>(b);
+  return std::make_shared<::arrow::io::BufferOutputStream>(buffer_);
 }
 
 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(FileSource 
source) {
@@ -66,7 +65,7 @@ Result<std::shared_ptr<FileFragment>> 
FileFormat::MakeFragment(
 }
 
 Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
-    FileSource destination, std::shared_ptr<Fragment> fragment,
+    WritableFileSource destination, std::shared_ptr<Fragment> fragment,
     std::shared_ptr<ScanOptions> scan_options,
     std::shared_ptr<ScanContext> scan_context) {
   return Status::NotImplemented("writing fragment of format ", type_name());
@@ -168,15 +167,15 @@ Result<std::shared_ptr<FileSystemDataset>> 
FileSystemDataset::Write(
       const auto& input_fragment = op.fragment();
       FileSource dest(path, filesystem);
 
-      ARROW_ASSIGN_OR_RAISE(
-          auto fragment,
-          plan.format->MakeFragment(dest, 
input_fragment->partition_expression()));
-      fragments.push_back(std::move(fragment));
+      ARROW_ASSIGN_OR_RAISE(auto write_task,
+                            plan.format->WriteFragment({path, filesystem}, 
input_fragment,
+                                                       scan_options, 
scan_context));
+      task_group->Append([write_task] { return write_task->Execute(); });
 
       ARROW_ASSIGN_OR_RAISE(
-          auto write_task,
-          plan.format->WriteFragment(dest, input_fragment, scan_options, 
scan_context));
-      task_group->Append([write_task] { return write_task->Execute(); });
+          auto fragment, plan.format->MakeFragment(
+                             {path, filesystem}, 
input_fragment->partition_expression()));
+      fragments.push_back(std::move(fragment));
     }
   }
 
diff --git a/cpp/src/arrow/dataset/file_base.h 
b/cpp/src/arrow/dataset/file_base.h
index 9d747b4..466ed0f 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <functional>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -44,88 +45,109 @@ namespace dataset {
 /// be read like a file
 class ARROW_DS_EXPORT FileSource {
  public:
-  // NOTE(kszucs): it'd be better to separate the BufferSource from FileSource
-  enum SourceKind { PATH, BUFFER };
-
   FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
-             Compression::type compression = Compression::UNCOMPRESSED,
-             bool writable = true)
-      : source_kind_(PATH),
-        path_(std::move(path)),
+             Compression::type compression = Compression::UNCOMPRESSED)
+      : path_(std::move(path)),
         filesystem_(std::move(filesystem)),
-        compression_(compression),
-        writable_(writable) {}
+        compression_(compression) {}
 
   explicit FileSource(std::shared_ptr<Buffer> buffer,
                       Compression::type compression = 
Compression::UNCOMPRESSED)
-      : source_kind_(BUFFER), buffer_(std::move(buffer)), 
compression_(compression) {}
+      : buffer_(std::move(buffer)), compression_(compression) {}
+
+  using CustomOpen = 
std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;
+  explicit FileSource(CustomOpen open) : custom_open_(std::move(open)) {}
 
-  explicit FileSource(std::shared_ptr<ResizableBuffer> buffer,
+  using CustomOpenWithCompression =
+      
std::function<Result<std::shared_ptr<io::RandomAccessFile>>(Compression::type)>;
+  explicit FileSource(CustomOpenWithCompression open_with_compression,
                       Compression::type compression = 
Compression::UNCOMPRESSED)
-      : source_kind_(BUFFER),
-        buffer_(std::move(buffer)),
-        compression_(compression),
-        writable_(true) {}
-
-  bool operator==(const FileSource& other) const {
-    if (id() != other.id()) {
-      return false;
-    }
+      : custom_open_(std::bind(std::move(open_with_compression), compression)),
+        compression_(compression) {}
 
-    if (id() == PATH) {
-      return path() == other.path() && filesystem() == other.filesystem();
-    }
+  explicit FileSource(std::shared_ptr<io::RandomAccessFile> file,
+                      Compression::type compression = 
Compression::UNCOMPRESSED)
+      : custom_open_([=] { return ToResult(file); }), 
compression_(compression) {}
 
-    return buffer()->Equals(*other.buffer());
-  }
+  FileSource() : custom_open_(CustomOpen{&InvalidOpen}) {}
 
-  /// \brief The kind of file, whether stored in a filesystem, memory
-  /// resident, or other
-  SourceKind id() const { return source_kind_; }
+  static std::vector<FileSource> FromPaths(const 
std::shared_ptr<fs::FileSystem>& fs,
+                                           std::vector<std::string> paths) {
+    std::vector<FileSource> sources;
+    for (auto&& path : paths) {
+      sources.emplace_back(std::move(path), fs);
+    }
+    return sources;
+  }
 
-  /// \brief Return the type of raw compression on the file, if any
+  /// \brief Return the type of raw compression on the file, if any.
   Compression::type compression() const { return compression_; }
 
-  /// \brief Whether the this source may be opened writable
-  bool writable() const { return writable_; }
-
-  /// \brief Return the file path, if any. Only valid when file source
-  /// type is PATH
+  /// \brief Return the file path, if any. Only valid when file source wraps a 
path.
   const std::string& path() const {
     static std::string buffer_path = "<Buffer>";
-    return id() == PATH ? path_ : buffer_path;
+    static std::string custom_open_path = "<Buffer>";
+    return filesystem_ ? path_ : buffer_ ? buffer_path : custom_open_path;
   }
 
-  /// \brief Return the filesystem, if any. Only non null when file
-  /// source type is PATH
-  const std::shared_ptr<fs::FileSystem>& filesystem() const {
-    static std::shared_ptr<fs::FileSystem> no_fs = NULLPTR;
-    return id() == PATH ? filesystem_ : no_fs;
-  }
+  /// \brief Return the filesystem, if any. Otherwise returns nullptr
+  const std::shared_ptr<fs::FileSystem>& filesystem() const { return 
filesystem_; }
 
-  /// \brief Return the buffer containing the file, if any. Only value
-  /// when file source type is BUFFER
-  const std::shared_ptr<Buffer>& buffer() const {
-    static std::shared_ptr<Buffer> path_buffer = NULLPTR;
-    return id() == BUFFER ? buffer_ : path_buffer;
-  }
+  /// \brief Return the buffer containing the file, if any. Otherwise returns 
nullptr
+  const std::shared_ptr<Buffer>& buffer() const { return buffer_; }
 
   /// \brief Get a RandomAccessFile which views this file source
-  Result<std::shared_ptr<arrow::io::RandomAccessFile>> Open() const;
-
-  /// \brief Get an OutputStream which wraps this file source
-  Result<std::shared_ptr<arrow::io::OutputStream>> OpenWritable() const;
+  Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
 
  private:
-  SourceKind source_kind_;
+  static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
+    return Status::Invalid("Called Open() on an uninitialized FileSource");
+  }
 
   std::string path_;
   std::shared_ptr<fs::FileSystem> filesystem_;
-
   std::shared_ptr<Buffer> buffer_;
+  CustomOpen custom_open_;
+  Compression::type compression_ = Compression::UNCOMPRESSED;
+};
+
+/// \brief The path and filesystem where an actual file is located or a buffer 
which can
+/// be written to like a file
+class ARROW_DS_EXPORT WritableFileSource {
+ public:
+  WritableFileSource(std::string path, std::shared_ptr<fs::FileSystem> 
filesystem,
+                     Compression::type compression = Compression::UNCOMPRESSED)
+      : path_(std::move(path)),
+        filesystem_(std::move(filesystem)),
+        compression_(compression) {}
+
+  explicit WritableFileSource(std::shared_ptr<ResizableBuffer> buffer,
+                              Compression::type compression = 
Compression::UNCOMPRESSED)
+      : buffer_(std::move(buffer)), compression_(compression) {}
+
+  /// \brief Return the type of raw compression on the file, if any
+  Compression::type compression() const { return compression_; }
+
+  /// \brief Return the file path, if any. Only valid when file source wraps a 
path.
+  const std::string& path() const {
+    static std::string buffer_path = "<Buffer>";
+    return filesystem_ ? path_ : buffer_path;
+  }
+
+  /// \brief Return the filesystem, if any. Otherwise returns nullptr
+  const std::shared_ptr<fs::FileSystem>& filesystem() const { return 
filesystem_; }
 
-  Compression::type compression_;
-  bool writable_ = false;
+  /// \brief Return the buffer containing the file, if any. Otherwise returns 
nullptr
+  const std::shared_ptr<ResizableBuffer>& buffer() const { return buffer_; }
+
+  /// \brief Get an OutputStream which wraps this file source
+  Result<std::shared_ptr<arrow::io::OutputStream>> Open() const;
+
+ private:
+  std::string path_;
+  std::shared_ptr<fs::FileSystem> filesystem_;
+  std::shared_ptr<ResizableBuffer> buffer_;
+  Compression::type compression_ = Compression::UNCOMPRESSED;
 };
 
 /// \brief Base class for file format implementation
@@ -159,7 +181,7 @@ class ARROW_DS_EXPORT FileFormat : public 
std::enable_shared_from_this<FileForma
   /// \brief Write a fragment. If the parent directory of destination does not 
exist, it
   /// will be created.
   virtual Result<std::shared_ptr<WriteTask>> WriteFragment(
-      FileSource destination, std::shared_ptr<Fragment> fragment,
+      WritableFileSource destination, std::shared_ptr<Fragment> fragment,
       std::shared_ptr<ScanOptions> options,
       std::shared_ptr<ScanContext> scan_context);  // FIXME(bkietz) make this 
pure virtual
 };
@@ -256,16 +278,16 @@ class ARROW_DS_EXPORT WriteTask {
 
   virtual ~WriteTask() = default;
 
-  const FileSource& destination() const;
+  const WritableFileSource& destination() const;
   const std::shared_ptr<FileFormat>& format() const { return format_; }
 
  protected:
-  WriteTask(FileSource destination, std::shared_ptr<FileFormat> format)
+  WriteTask(WritableFileSource destination, std::shared_ptr<FileFormat> format)
       : destination_(std::move(destination)), format_(std::move(format)) {}
 
   Status CreateDestinationParentDir() const;
 
-  FileSource destination_;
+  WritableFileSource destination_;
   std::shared_ptr<FileFormat> format_;
 };
 
diff --git a/cpp/src/arrow/dataset/file_ipc.cc 
b/cpp/src/arrow/dataset/file_ipc.cc
index 6a509cd..f080a25 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -160,7 +160,7 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
 
 class IpcWriteTask : public WriteTask {
  public:
-  IpcWriteTask(FileSource destination, std::shared_ptr<FileFormat> format,
+  IpcWriteTask(WritableFileSource destination, std::shared_ptr<FileFormat> 
format,
                std::shared_ptr<Fragment> fragment,
                std::shared_ptr<ScanOptions> scan_options,
                std::shared_ptr<ScanContext> scan_context)
@@ -174,7 +174,7 @@ class IpcWriteTask : public WriteTask {
 
     auto schema = scan_options_->schema();
 
-    ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable());
+    ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.Open());
     ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewFileWriter(out_stream.get(), 
schema));
     ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
                           fragment_->Scan(scan_options_, scan_context_));
@@ -200,7 +200,7 @@ class IpcWriteTask : public WriteTask {
 };
 
 Result<std::shared_ptr<WriteTask>> IpcFileFormat::WriteFragment(
-    FileSource destination, std::shared_ptr<Fragment> fragment,
+    WritableFileSource destination, std::shared_ptr<Fragment> fragment,
     std::shared_ptr<ScanOptions> scan_options,
     std::shared_ptr<ScanContext> scan_context) {
   return std::make_shared<IpcWriteTask>(std::move(destination), 
shared_from_this(),
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index eb465bf..70db5e9 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -48,7 +48,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
                                     std::shared_ptr<ScanContext> context) 
const override;
 
   Result<std::shared_ptr<WriteTask>> WriteFragment(
-      FileSource destination, std::shared_ptr<Fragment> fragment,
+      WritableFileSource destination, std::shared_ptr<Fragment> fragment,
       std::shared_ptr<ScanOptions> options,
       std::shared_ptr<ScanContext> context) override;
 };
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc 
b/cpp/src/arrow/dataset/file_ipc_test.cc
index 827ca5e..747dccf 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -87,10 +87,10 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin {
                                     kBatchRepetitions);
   }
 
-  Result<FileSource> GetFileSink() {
+  Result<WritableFileSource> GetFileSink() {
     ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
                           AllocateResizableBuffer(0));
-    return FileSource(std::move(buffer));
+    return WritableFileSource(std::move(buffer));
   }
 
   RecordBatchIterator Batches(ScanTaskIterator scan_task_it) {
diff --git a/cpp/src/arrow/dataset/file_test.cc 
b/cpp/src/arrow/dataset/file_test.cc
index 8651c2d..a2d3add 100644
--- a/cpp/src/arrow/dataset/file_test.cc
+++ b/cpp/src/arrow/dataset/file_test.cc
@@ -49,17 +49,17 @@ TEST(FileSource, PathBased) {
 
   ASSERT_EQ(p1, source1.path());
   ASSERT_TRUE(localfs->Equals(*source1.filesystem()));
-  ASSERT_EQ(FileSource::PATH, source1.id());
   ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
 
   ASSERT_EQ(p2, source2.path());
   ASSERT_TRUE(localfs->Equals(*source2.filesystem()));
-  ASSERT_EQ(FileSource::PATH, source2.id());
   ASSERT_EQ(Compression::GZIP, source2.compression());
 
   // Test copy constructor and comparison
-  FileSource source3 = source1;
-  ASSERT_EQ(source1, source3);
+  FileSource source3;
+  source3 = source1;
+  ASSERT_EQ(source1.path(), source3.path());
+  ASSERT_EQ(source1.filesystem(), source3.filesystem());
 }
 
 TEST(FileSource, BufferBased) {
@@ -69,13 +69,15 @@ TEST(FileSource, BufferBased) {
   FileSource source1(buf);
   FileSource source2(buf, Compression::LZ4);
 
-  ASSERT_EQ(FileSource::BUFFER, source1.id());
   ASSERT_TRUE(source1.buffer()->Equals(*buf));
   ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
 
-  ASSERT_EQ(FileSource::BUFFER, source2.id());
   ASSERT_TRUE(source2.buffer()->Equals(*buf));
   ASSERT_EQ(Compression::LZ4, source2.compression());
+
+  FileSource source3;
+  source3 = source1;
+  ASSERT_EQ(source1.buffer(), source3.buffer());
 }
 
 TEST_F(TestFileSystemDataset, Basic) {
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index a0fd3ff..c228f71 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -131,7 +131,7 @@ class FinishableStream {
   std::shared_ptr<Stream> stream() const { return stream_; }
 
   /// \brief Finish the call, adding server context to the given status.
-  virtual Status Finish(Status&& st) {
+  virtual Status Finish(Status st) {
     if (finished_) {
       return MergeStatus(std::move(st));
     }
@@ -202,7 +202,7 @@ class FinishableWritableStream : public 
FinishableStream<Stream, ReadT> {
     return Status::OK();
   }
 
-  Status Finish(Status&& st) override {
+  Status Finish(Status st) override {
     // This may be used concurrently by reader/writer side of a
     // stream, so it needs to be protected.
     std::lock_guard<std::mutex> guard(finish_mutex_);
@@ -223,14 +223,14 @@ class FinishableWritableStream : public 
FinishableStream<Stream, ReadT> {
     bool finished_writes = done_writing_ || this->stream()->WritesDone();
     done_writing_ = true;
 
-    Status result = FinishableStream<Stream, ReadT>::Finish(std::move(st));
+    st = FinishableStream<Stream, ReadT>::Finish(std::move(st));
 
     if (!finished_writes) {
       return Status::FromDetailAndArgs(
-          result.code(), result.detail(), result.message(),
+          st.code(), st.detail(), st.message(),
           ". Additionally, could not finish writing record batches before 
closing");
     }
-    return result;
+    return st;
   }
 
  private:
@@ -444,7 +444,7 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
     // Validate IPC message
     auto result = data->OpenMessage();
     if (!result.ok()) {
-      return stream_->Finish(result.status());
+      return stream_->Finish(std::move(result).status());
     }
     *app_metadata_ = std::move(data->app_metadata);
     return result;
diff --git a/cpp/src/arrow/python/common.cc b/cpp/src/arrow/python/common.cc
index 5b7b36e..8ff3485 100644
--- a/cpp/src/arrow/python/common.cc
+++ b/cpp/src/arrow/python/common.cc
@@ -152,13 +152,6 @@ Status ConvertPyError(StatusCode code) {
   return Status(code, message, detail);
 }
 
-Status PassPyError() {
-  if (PyErr_Occurred()) {
-    return ConvertPyError();
-  }
-  return Status::OK();
-}
-
 bool IsPyError(const Status& status) {
   if (status.ok()) {
     return false;
diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h
index e2757ed..c38a6f8 100644
--- a/cpp/src/arrow/python/common.h
+++ b/cpp/src/arrow/python/common.h
@@ -38,8 +38,6 @@ namespace py {
 // Convert current Python error to a Status.  The Python error state is cleared
 // and can be restored with RestorePyError().
 ARROW_PYTHON_EXPORT Status ConvertPyError(StatusCode code = 
StatusCode::UnknownError);
-// Same as ConvertPyError(), but returns Status::OK() if no Python error is 
set.
-ARROW_PYTHON_EXPORT Status PassPyError();
 // Query whether the given Status is a Python error (as wrapped by 
ConvertPyError()).
 ARROW_PYTHON_EXPORT bool IsPyError(const Status& status);
 // Restore a Python error wrapped in a Status.
@@ -55,9 +53,9 @@ inline Status CheckPyError(StatusCode code = 
StatusCode::UnknownError) {
   }
 }
 
-#define RETURN_IF_PYERROR() ARROW_RETURN_NOT_OK(CheckPyError());
+#define RETURN_IF_PYERROR() ARROW_RETURN_NOT_OK(CheckPyError())
 
-#define PY_RETURN_IF_ERROR(CODE) ARROW_RETURN_NOT_OK(CheckPyError(CODE));
+#define PY_RETURN_IF_ERROR(CODE) ARROW_RETURN_NOT_OK(CheckPyError(CODE))
 
 // For Cython, as you can't define template C++ functions in Cython, only use 
them.
 // This function can set a Python exception.  It assumes that T has a (cheap)
diff --git a/cpp/src/arrow/python/numpy_convert.cc 
b/cpp/src/arrow/python/numpy_convert.cc
index cd8da8d..85c5f33 100644
--- a/cpp/src/arrow/python/numpy_convert.cc
+++ b/cpp/src/arrow/python/numpy_convert.cc
@@ -277,7 +277,7 @@ Status TensorToNdarray(const std::shared_ptr<Tensor>& 
tensor, PyObject* base,
   PyObject* result =
       PyArray_NewFromDescr(&PyArray_Type, dtype, ndim, npy_shape.data(),
                            npy_strides.data(), mutable_data, array_flags, 
nullptr);
-  RETURN_IF_PYERROR()
+  RETURN_IF_PYERROR();
 
   if (base == Py_None || base == nullptr) {
     base = py::wrap_tensor(tensor);
@@ -309,7 +309,7 @@ static Status SparseTensorDataToNdarray(const SparseTensor& 
sparse_tensor,
   *out_data = PyArray_NewFromDescr(&PyArray_Type, dtype_data,
                                    static_cast<int>(data_shape.size()), 
data_shape.data(),
                                    nullptr, mutable_data, array_flags, 
nullptr);
-  RETURN_IF_PYERROR()
+  RETURN_IF_PYERROR();
   Py_XINCREF(base);
   PyArray_SetBaseObject(reinterpret_cast<PyArrayObject*>(*out_data), base);
   return Status::OK();
diff --git a/cpp/src/arrow/python/serialize.cc 
b/cpp/src/arrow/python/serialize.cc
index 9d10041..a4645b7 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -371,7 +371,7 @@ Status CallCustomCallback(PyObject* context, PyObject* 
method_name, PyObject* el
                                       ": handler not registered");
   } else {
     *result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL);
-    return PassPyError();
+    return CheckPyError();
   }
 }
 
diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h
index b492057..96863ec 100644
--- a/cpp/src/arrow/result.h
+++ b/cpp/src/arrow/result.h
@@ -298,7 +298,7 @@ class ARROW_MUST_USE_TYPE Result : public 
util::EqualityComparable<Result<T>> {
   ///
   /// \return The stored non-OK status object, or an OK status if this object
   ///         has a value.
-  Status status() const { return status_; }
+  const Status& status() const { return status_; }
 
   /// Gets the stored `T` value.
   ///
@@ -462,4 +462,9 @@ inline Status GenericToStatus(Result<T>&& res) {
 
 }  // namespace internal
 
+template <typename T>
+Result<T> ToResult(T t) {
+  return Result<T>(std::move(t));
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 33ca303..36f1b90 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -306,8 +306,9 @@ class ARROW_MUST_USE_TYPE ARROW_EXPORT Status : public 
util::EqualityComparable<
   std::string message() const { return ok() ? "" : state_->msg; }
 
   /// \brief Return the status detail attached to this message.
-  std::shared_ptr<StatusDetail> detail() const {
-    return state_ == NULLPTR ? NULLPTR : state_->detail;
+  const std::shared_ptr<StatusDetail>& detail() const {
+    static std::shared_ptr<StatusDetail> no_detail = NULLPTR;
+    return state_ ? state_->detail : no_detail;
   }
 
   /// \brief Return a new Status copying the existing status, but
diff --git a/cpp/src/arrow/util/checked_cast.h 
b/cpp/src/arrow/util/checked_cast.h
index 3692bd9..97f6b61 100644
--- a/cpp/src/arrow/util/checked_cast.h
+++ b/cpp/src/arrow/util/checked_cast.h
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <type_traits>
+#include <utility>
 
 namespace arrow {
 namespace internal {
@@ -39,11 +40,11 @@ inline OutputType checked_cast(InputType&& value) {
 }
 
 template <class T, class U>
-std::shared_ptr<T> checked_pointer_cast(const std::shared_ptr<U>& r) noexcept {
+std::shared_ptr<T> checked_pointer_cast(std::shared_ptr<U> r) noexcept {
 #ifdef NDEBUG
-  return std::static_pointer_cast<T>(r);
+  return std::static_pointer_cast<T>(std::move(r));
 #else
-  return std::dynamic_pointer_cast<T>(r);
+  return std::dynamic_pointer_cast<T>(std::move(r));
 #endif
 }
 
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index 1cbc18e..82ab56a 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -714,7 +714,7 @@ def docker_compose_run(obj, image, command, *, env, user, 
force_pull,
                        dry_run, volume):
     """Execute docker-compose builds.
 
-    To see the available builds run `archery docker list`.
+    To see the available builds run `archery docker images`.
 
     Examples:
 
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index acb88e2..f4997a9 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -29,6 +29,7 @@ from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
 from pyarrow._csv cimport ParseOptions
 from pyarrow._compute cimport CastOptions
+from pyarrow.util import _is_path_like, _stringify_path
 
 
 def _forbid_instantiation(klass, subclasses_instead=True):
@@ -43,6 +44,51 @@ def _forbid_instantiation(klass, subclasses_instead=True):
     raise TypeError(msg)
 
 
+cdef class FileSource:
+
+    cdef:
+        CFileSource wrapped
+
+    def __cinit__(self, file, FileSystem filesystem=None):
+        cdef:
+            shared_ptr[CFileSystem] c_filesystem
+            c_string c_path
+            shared_ptr[CRandomAccessFile] c_file
+            shared_ptr[CBuffer] c_buffer
+
+        if isinstance(file, FileSource):
+            self.wrapped = (<FileSource> file).wrapped
+
+        elif isinstance(file, Buffer):
+            c_buffer = pyarrow_unwrap_buffer(file)
+            self.wrapped = CFileSource(move(c_buffer))
+
+        elif _is_path_like(file):
+            if filesystem is None:
+                raise ValueError("cannot construct a FileSource from "
+                                 "a path without a FileSystem")
+            c_filesystem = filesystem.unwrap()
+            c_path = tobytes(_stringify_path(file))
+            self.wrapped = CFileSource(move(c_path), move(c_filesystem))
+
+        elif hasattr(file, 'read'):
+            # Optimistically hope this is file-like
+            c_file = get_native_file(file, False).get_random_access_file()
+            self.wrapped = CFileSource(move(c_file))
+
+        else:
+            raise TypeError("cannot construct a FileSource "
+                            "from " + str(file))
+
+    @staticmethod
+    def from_uri(uri):
+        filesystem, path = FileSystem.from_uri(uri)
+        return FileSource(path, filesystem)
+
+    cdef CFileSource unwrap(self) nogil:
+        return self.wrapped
+
+
 cdef class Expression:
 
     cdef:
@@ -226,16 +272,18 @@ cdef class Dataset:
 
     @staticmethod
     cdef wrap(const shared_ptr[CDataset]& sp):
-        cdef Dataset self
+        type_name = frombytes(sp.get().type_name())
 
-        typ = frombytes(sp.get().type_name())
-        if typ == 'union':
-            self = UnionDataset.__new__(UnionDataset)
-        elif typ == 'filesystem':
-            self = FileSystemDataset.__new__(FileSystemDataset)
-        else:
-            raise TypeError(typ)
+        classes = {
+            'union': UnionDataset,
+            'filesystem': FileSystemDataset,
+        }
+
+        class_ = classes.get(type_name, None)
+        if class_ is None:
+            raise TypeError(type_name)
 
+        cdef Dataset self = class_.__new__(class_)
         self.init(sp)
         return self
 
@@ -411,7 +459,7 @@ cdef class FileSystemDataset(Dataset):
 
     Parameters
     ----------
-    paths_or_selector : Union[FileSelector, List[FileInfo]]
+    paths_or_selector : Union[FileSelector, List[str]]
         List of files/directories to consume.
     schema : Schema
         The top-level schema of the DataDataset.
@@ -457,8 +505,7 @@ cdef class FileSystemDataset(Dataset):
 
         infos = filesystem.get_file_info(paths_or_selector)
 
-        if partitions is None:
-            partitions = [_true] * len(infos)
+        partitions = partitions or [_true] * len(infos)
 
         if len(infos) != len(partitions):
             raise ValueError(
@@ -531,50 +578,45 @@ cdef class FileFormat:
 
     @staticmethod
     cdef wrap(const shared_ptr[CFileFormat]& sp):
-        cdef FileFormat self
-
-        typ = frombytes(sp.get().type_name())
-        if typ == 'parquet':
-            self = ParquetFileFormat.__new__(ParquetFileFormat)
-        elif typ == 'ipc':
-            self = IpcFileFormat.__new__(IpcFileFormat)
-        elif typ == 'csv':
-            self = CsvFileFormat.__new__(CsvFileFormat)
-        else:
-            raise TypeError(typ)
+        type_name = frombytes(sp.get().type_name())
+
+        classes = {
+            'ipc': IpcFileFormat,
+            'csv': CsvFileFormat,
+            'parquet': ParquetFileFormat,
+        }
+
+        class_ = classes.get(type_name, None)
+        if class_ is None:
+            raise TypeError(type_name)
 
+        cdef FileFormat self = class_.__new__(class_)
         self.init(sp)
         return self
 
     cdef inline shared_ptr[CFileFormat] unwrap(self):
         return self.wrapped
 
-    def inspect(self, str path not None, FileSystem filesystem not None):
+    def inspect(self, file, filesystem=None):
         """Infer the schema of a file."""
-        cdef:
-            shared_ptr[CSchema] c_schema
-
-        c_schema = GetResultValue(self.format.Inspect(CFileSource(
-            tobytes(path), filesystem.unwrap())))
+        c_source = FileSource(file, filesystem).unwrap()
+        c_schema = GetResultValue(self.format.Inspect(c_source))
         return pyarrow_wrap_schema(move(c_schema))
 
-    def make_fragment(self, str path not None, FileSystem filesystem not None,
+    def make_fragment(self, file, filesystem=None,
                       Expression partition_expression=None):
         """
         Make a FileFragment of this FileFormat. The filter may not reference
         fields absent from the provided schema. If no schema is provided then
         one will be inferred.
         """
-        cdef:
-            shared_ptr[CFileFragment] c_fragment
-
         partition_expression = partition_expression or _true
 
-        c_fragment = GetResultValue(
-            self.format.MakeFragment(CFileSource(tobytes(path),
-                                                 filesystem.unwrap()),
+        c_source = FileSource(file, filesystem).unwrap()
+        c_fragment = <shared_ptr[CFragment]> GetResultValue(
+            self.format.MakeFragment(move(c_source),
                                      partition_expression.unwrap()))
-        return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+        return Fragment.wrap(move(c_fragment))
 
     def __eq__(self, other):
         try:
@@ -590,26 +632,30 @@ cdef class Fragment:
         shared_ptr[CFragment] wrapped
         CFragment* fragment
 
+    def __init__(self):
+        _forbid_instantiation(self.__class__)
+
     cdef void init(self, const shared_ptr[CFragment]& sp):
         self.wrapped = sp
         self.fragment = sp.get()
 
     @staticmethod
     cdef wrap(const shared_ptr[CFragment]& sp):
-        # there's no discriminant in Fragment, so we can't downcast
-        # to FileFragment for the path property
-        cdef Fragment self = Fragment()
-
-        typ = frombytes(sp.get().type_name())
-        if typ == 'ipc':
-            # IpcFileFormat does not have a corresponding subclass
-            # of FileFragment
-            self = FileFragment.__new__(FileFragment)
-        elif typ == 'parquet':
-            self = ParquetFileFragment.__new__(ParquetFileFragment)
-        else:
-            self = Fragment()
+        type_name = frombytes(sp.get().type_name())
+
+        classes = {
+            # IpcFileFormat and CsvFileFormat do not have corresponding
+            # subclasses of FileFragment
+            'ipc': FileFragment,
+            'csv': FileFragment,
+            'parquet': ParquetFileFragment,
+        }
+
+        class_ = classes.get(type_name, None)
+        if class_ is None:
+            class_ = Fragment
 
+        cdef Fragment self = class_.__new__(class_)
         self.init(sp)
         return self
 
@@ -732,9 +778,13 @@ cdef class FileFragment(Fragment):
         it views a file. If instead it views a buffer, this will be None.
         """
         cdef:
-            shared_ptr[CFileSystem] fs
-        fs = self.file_fragment.source().filesystem()
-        return FileSystem.wrap(fs)
+            shared_ptr[CFileSystem] c_fs
+        c_fs = self.file_fragment.source().filesystem()
+
+        if c_fs.get() == nullptr:
+            return None
+
+        return FileSystem.wrap(c_fs)
 
     @property
     def buffer(self):
@@ -941,27 +991,25 @@ cdef class ParquetFileFormat(FileFormat):
     def __reduce__(self):
         return ParquetFileFormat, (self.read_options,)
 
-    def make_fragment(self, str path not None, FileSystem filesystem not None,
+    def make_fragment(self, file, filesystem=None,
                       Expression partition_expression=None, row_groups=None):
         cdef:
-            shared_ptr[CFileFragment] c_fragment
             vector[int] c_row_groups
 
         partition_expression = partition_expression or _true
 
         if row_groups is None:
-            return super().make_fragment(path, filesystem,
+            return super().make_fragment(file, filesystem,
                                          partition_expression)
 
-        for row_group in set(row_groups):
-            c_row_groups.push_back(<int> row_group)
+        c_source = FileSource(file, filesystem).unwrap()
+        c_row_groups = [<int> row_group for row_group in set(row_groups)]
 
-        c_fragment = GetResultValue(
-            self.parquet_format.MakeFragment(CFileSource(tobytes(path),
-                                                         filesystem.unwrap()),
+        c_fragment = <shared_ptr[CFragment]> GetResultValue(
+            self.parquet_format.MakeFragment(move(c_source),
                                              partition_expression.unwrap(),
                                              move(c_row_groups)))
-        return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+        return Fragment.wrap(move(c_fragment))
 
 
 cdef class IpcFileFormat(FileFormat):
@@ -1019,16 +1067,18 @@ cdef class Partitioning:
 
     @staticmethod
     cdef wrap(const shared_ptr[CPartitioning]& sp):
-        cdef Partitioning self
+        type_name = frombytes(sp.get().type_name())
 
-        typ = frombytes(sp.get().type_name())
-        if typ == 'schema':
-            self = DirectoryPartitioning.__new__(DirectoryPartitioning)
-        elif typ == 'hive':
-            self = HivePartitioning.__new__(HivePartitioning)
-        else:
-            raise TypeError(typ)
+        classes = {
+            'schema': DirectoryPartitioning,
+            'hive': HivePartitioning,
+        }
+
+        class_ = classes.get(type_name, None)
+        if class_ is None:
+            raise TypeError(type_name)
 
+        cdef Partitioning self = class_.__new__(class_)
         self.init(sp)
         return self
 
@@ -1430,24 +1480,23 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
                  FileSystemFactoryOptions options=None):
         cdef:
             vector[c_string] paths
-            CFileSelector selector
+            CFileSelector c_selector
             CResult[shared_ptr[CDatasetFactory]] result
             shared_ptr[CFileSystem] c_filesystem
             shared_ptr[CFileFormat] c_format
             CFileSystemFactoryOptions c_options
 
-        c_filesystem = filesystem.unwrap()
-        c_format = format.unwrap()
-
         options = options or FileSystemFactoryOptions()
         c_options = options.unwrap()
+        c_filesystem = filesystem.unwrap()
+        c_format = format.unwrap()
 
         if isinstance(paths_or_selector, FileSelector):
             with nogil:
-                selector = (<FileSelector>paths_or_selector).selector
+                c_selector = (<FileSelector> paths_or_selector).selector
                 result = CFileSystemDatasetFactory.MakeFromSelector(
                     c_filesystem,
-                    selector,
+                    c_selector,
                     c_format,
                     c_options
                 )
diff --git a/python/pyarrow/includes/common.pxd 
b/python/pyarrow/includes/common.pxd
index d8ee866..c2ef569 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -116,13 +116,18 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
 cdef extern from "arrow/result.h" namespace "arrow" nogil:
     cdef cppclass CResult "arrow::Result"[T]:
+        CResult()
+        CResult(CStatus)
+        CResult(T)
         c_bool ok()
         CStatus status()
         T operator*()
 
+
 cdef extern from "arrow/python/common.h" namespace "arrow::py" nogil:
     T GetResultValue[T](CResult[T]) except *
 
+
 cdef inline object PyObject_to_object(PyObject* o):
     # Cast to "object" increments reference count
     cdef object result = <object> o
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index 56140bd..e84ff78 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -195,7 +195,11 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
         const c_string& path() const
         const shared_ptr[CFileSystem]& filesystem() const
         const shared_ptr[CBuffer]& buffer() const
-        CFileSource(c_string path, shared_ptr[CFileSystem] filesystem)
+        # HACK: Cython can't handle all the overloads so don't declare them.
+        # This means invalid construction of CFileSource won't be caught in
+        # the C++ generation phase (though it will still be caught when
+        # the generated C++ is compiled).
+        CFileSource(...)
 
     cdef cppclass CFileFormat "arrow::dataset::FileFormat":
         c_string type_name() const
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 53cb5de..77ac4f8 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1376,7 +1376,7 @@ cdef shared_ptr[CBuffer] as_c_buffer(object o) except *:
     return buf
 
 
-cdef NativeFile _get_native_file(object source, c_bool use_memory_map):
+cdef NativeFile get_native_file(object source, c_bool use_memory_map):
     try:
         source_path = _stringify_path(source)
     except TypeError:
@@ -1398,7 +1398,7 @@ cdef get_reader(object source, c_bool use_memory_map,
                 shared_ptr[CRandomAccessFile]* reader):
     cdef NativeFile nf
 
-    nf = _get_native_file(source, use_memory_map)
+    nf = get_native_file(source, use_memory_map)
     reader[0] = nf.get_random_access_file()
 
 
@@ -1418,7 +1418,7 @@ cdef get_input_stream(object source, c_bool 
use_memory_map,
     except TypeError:
         codec = None
 
-    nf = _get_native_file(source, use_memory_map)
+    nf = get_native_file(source, use_memory_map)
     input_stream = nf.get_input_stream()
 
     # codec is None if compression can't be detected
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8b757d5..b4cd230 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -579,6 +579,7 @@ cdef get_input_stream(object source, c_bool use_memory_map,
 cdef get_reader(object source, c_bool use_memory_map,
                 shared_ptr[CRandomAccessFile]* reader)
 cdef get_writer(object source, shared_ptr[COutputStream]* writer)
+cdef NativeFile get_native_file(object source, c_bool use_memory_map)
 
 # Default is allow_none=False
 cpdef DataType ensure_type(object type, bint allow_none=*)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index ef5c3be..c581e0f 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1390,6 +1390,26 @@ class _ParquetDatasetV2:
                     "Keyword '{0}' is not yet supported with the new "
                     "Dataset API".format(keyword))
 
+        # map format arguments
+        read_options = {}
+        if buffer_size:
+            read_options.update(use_buffered_stream=True,
+                                buffer_size=buffer_size)
+        if read_dictionary is not None:
+            read_options.update(dictionary_columns=read_dictionary)
+        parquet_format = ds.ParquetFileFormat(read_options=read_options)
+
+        # map filters to Expressions
+        self._filters = filters
+        self._filter_expression = filters and _filters_to_expression(filters)
+
+        # check for single NativeFile dataset
+        if not isinstance(path_or_paths, list):
+            if not _is_path_like(path_or_paths):
+                self._fragment = parquet_format.make_fragment(path_or_paths)
+                self._dataset = None
+                return
+
         # map old filesystems to new one
         # TODO(dataset) deal with other file systems
         if isinstance(filesystem, LocalFileSystem):
@@ -1399,27 +1419,16 @@ class _ParquetDatasetV2:
             # path can in principle be URI for any filesystem)
             filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True)
 
-        # map additional arguments
-        read_options = {}
-        if buffer_size:
-            read_options.update(use_buffered_stream=True,
-                                buffer_size=buffer_size)
-        if read_dictionary is not None:
-            read_options.update(dictionary_columns=read_dictionary)
-        parquet_format = ds.ParquetFileFormat(read_options=read_options)
-
         self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
                                    format=parquet_format,
                                    partitioning=partitioning)
-        self._filters = filters
-        if filters is not None:
-            self._filter_expression = _filters_to_expression(filters)
-        else:
-            self._filter_expression = None
+        self._fragment = None
 
     @property
     def schema(self):
-        return self._dataset.schema
+        if self._dataset is not None:
+            return self._dataset.schema
+        return self._fragment.physical_schema
 
     def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
         """
@@ -1444,13 +1453,17 @@ class _ParquetDatasetV2:
         """
         # if use_pandas_metadata, we need to include index columns in the
         # column selection, to be able to restore those in the pandas DataFrame
-        metadata = self._dataset.schema.metadata
+        metadata = self.schema.metadata
         if columns is not None and use_pandas_metadata:
             if metadata and b'pandas' in metadata:
-                index_columns = set(_get_pandas_index_columns(metadata))
-                columns = columns + list(index_columns - set(columns))
-
-        table = self._dataset.to_table(
+                # RangeIndex can be represented as dict instead of column name
+                index_columns = [
+                    col for col in _get_pandas_index_columns(metadata)
+                    if not isinstance(col, dict)
+                ]
+                columns = columns + list(set(index_columns) - set(columns))
+
+        table = (self._dataset or self._fragment).to_table(
             columns=columns, filter=self._filter_expression,
             use_threads=use_threads
         )
@@ -1521,10 +1534,6 @@ def read_table(source, columns=None, use_threads=True, 
metadata=None,
                read_dictionary=None, filesystem=None, filters=None,
                buffer_size=0, partitioning="hive", use_legacy_dataset=True):
     if not use_legacy_dataset:
-        if not _is_path_like(source):
-            raise ValueError("File-like objects are not yet supported with "
-                             "the new Dataset API")
-
         dataset = _ParquetDatasetV2(
             source,
             filesystem=filesystem,
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 52b44ba..cd86df2 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -89,18 +89,11 @@ def _roundtrip_table(table, read_table_kwargs=None,
     read_table_kwargs = read_table_kwargs or {}
     write_table_kwargs = write_table_kwargs or {}
 
-    if use_legacy_dataset:
-        buf = io.BytesIO()
-        _write_table(table, buf, **write_table_kwargs)
-        buf.seek(0)
-        return _read_table(buf, **read_table_kwargs)
-    else:
-        from pyarrow.fs import _MockFileSystem
-        mockfs = _MockFileSystem()
-        with mockfs.open_output_stream("test") as out:
-            _write_table(table, out, **write_table_kwargs)
-        return _read_table("test", filesystem=mockfs, use_legacy_dataset=False,
-                           **read_table_kwargs)
+    buf = io.BytesIO()
+    _write_table(table, buf, **write_table_kwargs)
+    buf.seek(0)
+    return _read_table(buf, use_legacy_dataset=use_legacy_dataset,
+                       **read_table_kwargs)
 
 
 def _check_roundtrip(table, expected=None, read_table_kwargs=None,
@@ -349,7 +342,7 @@ def 
test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_pandas_parquet_datetime_tz(use_legacy_dataset):
     s = pd.Series([datetime.datetime(2017, 9, 6)])
     s = s.dt.tz_localize('utc')
@@ -368,7 +361,7 @@ def test_pandas_parquet_datetime_tz(use_legacy_dataset):
     _write_table(arrow_table, f, coerce_timestamps='ms')
     f.seek(0)
 
-    table_read = pq.read_pandas(f)
+    table_read = pq.read_pandas(f, use_legacy_dataset=use_legacy_dataset)
 
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
@@ -572,9 +565,8 @@ def _test_dataframe(size=10000, seed=0):
     return df
 
 
-# TODO(ARROW-8074) NativeFile support
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset):
     df = _test_dataframe(10000)
     arrow_table = pa.Table.from_pandas(df)
@@ -588,7 +580,7 @@ def test_pandas_parquet_native_file_roundtrip(tempdir, 
use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
     df = _test_dataframe(100)
     df['unique_id'] = 0
@@ -617,7 +609,7 @@ def test_parquet_incremental_file_build(tempdir, 
use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_read_pandas_column_subset(tempdir, use_legacy_dataset):
     df = _test_dataframe(10000)
     arrow_table = pa.Table.from_pandas(df)
@@ -633,7 +625,7 @@ def test_read_pandas_column_subset(tempdir, 
use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset):
     df = _test_dataframe(0)
     arrow_table = pa.Table.from_pandas(df)
@@ -672,7 +664,7 @@ def test_pandas_can_write_nested_data(tempdir):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
     filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
     size = 5
@@ -1459,7 +1451,7 @@ def test_fixed_size_binary():
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_multithreaded_read(use_legacy_dataset):
     df = alltypes_sample(size=10000)
 
@@ -1480,7 +1472,7 @@ def test_multithreaded_read(use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_min_chunksize(use_legacy_dataset):
     data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
     table = pa.Table.from_pandas(data.reset_index())
@@ -3325,7 +3317,7 @@ def test_decimal_roundtrip_negative_scale(tempdir):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
     df = _test_dataframe(100)
     df['unique_id'] = 0
@@ -3352,7 +3344,7 @@ def test_parquet_writer_context_obj(tempdir, 
use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_parquet_writer_context_obj_with_exception(
     tempdir, use_legacy_dataset
 ):
@@ -3388,7 +3380,7 @@ def test_parquet_writer_context_obj_with_exception(
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_zlib_compression_bug(use_legacy_dataset):
     # ARROW-3514: "zlib deflate failed, output buffer too small"
     table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
@@ -3448,7 +3440,7 @@ def test_empty_row_groups(tempdir):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):
     out = pa.BufferOutputStream()
 
@@ -3558,7 +3550,7 @@ def test_read_column_invalid_index():
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_direct_read_dictionary(use_legacy_dataset):
     # ARROW-3325
     repeats = 10
@@ -3610,7 +3602,7 @@ def test_dataset_read_dictionary(tempdir, 
use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset_not_supported  # ARROW-8799
 def test_direct_read_dictionary_subfield(use_legacy_dataset):
     repeats = 10
     nunique = 5
@@ -3726,7 +3718,7 @@ def test_parquet_file_too_small(tempdir, 
use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_categorical_index_survives_roundtrip(use_legacy_dataset):
     # ARROW-3652, addressed by ARROW-3246
     df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2'])
@@ -3743,7 +3735,7 @@ def 
test_categorical_index_survives_roundtrip(use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_categorical_order_survives_roundtrip(use_legacy_dataset):
     # ARROW-6302
     df = pd.DataFrame({"a": pd.Categorical(
@@ -3767,7 +3759,7 @@ def _simple_table_write_read(table):
     return pq.read_table(pa.BufferReader(contents))
 
 
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_dictionary_array_automatically_read(use_legacy_dataset):
     # ARROW-3246
 
@@ -3833,7 +3825,7 @@ def test_field_id_metadata():
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
     # ARROW-5085
     df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100})
@@ -3853,7 +3845,7 @@ def 
test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
 
 
 @pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 def test_pandas_categorical_roundtrip(use_legacy_dataset):
     # ARROW-5480, this was enabled by ARROW-3246
 
@@ -4021,7 +4013,7 @@ def test_table_large_metadata():
     _check_roundtrip(table)
 
 
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
 @pytest.mark.parametrize('array_factory', [
     lambda: pa.array([0, None] * 10),
     lambda: pa.array([0, None] * 10).dictionary_encode(),

Reply via email to