This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7dca6 ARROW-8074: [C++][Dataset][Python] FileFragments from buffers
and NativeFiles
9d7dca6 is described below
commit 9d7dca643fe60b1131c62646f4a3849262bb46b9
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Jun 18 14:08:32 2020 +0200
ARROW-8074: [C++][Dataset][Python] FileFragments from buffers and
NativeFiles
Adds `ds.FileSource`, which represents an openable file and may be
initialized from a `path, filesystem`, a `Buffer`, or any python object which
can be wrapped by `NativeFile`.
`test_parquet.py` now uses `BytesIO` as the roundtrip medium for non legacy
`ParquetDataset` instead of resorting to a mock filesystem. Other than that the
integration with Python is somewhat haphazard; I'm thinking we need to rewrite
some of the APIs to be less magical about figuring out what is a selector,
path, list(paths), etc since we will be adding buffers and `NativeFile`s to the
mix.
Closes #7156 from bkietz/8047-FileFragments-from-NativeFile
Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
cpp/examples/arrow/dataset-parquet-scan-example.cc | 6 +-
cpp/src/arrow/dataset/file_base.cc | 39 ++--
cpp/src/arrow/dataset/file_base.h | 140 +++++++++------
cpp/src/arrow/dataset/file_ipc.cc | 6 +-
cpp/src/arrow/dataset/file_ipc.h | 2 +-
cpp/src/arrow/dataset/file_ipc_test.cc | 4 +-
cpp/src/arrow/dataset/file_test.cc | 14 +-
cpp/src/arrow/flight/client.cc | 12 +-
cpp/src/arrow/python/common.cc | 7 -
cpp/src/arrow/python/common.h | 6 +-
cpp/src/arrow/python/numpy_convert.cc | 4 +-
cpp/src/arrow/python/serialize.cc | 2 +-
cpp/src/arrow/result.h | 7 +-
cpp/src/arrow/status.h | 5 +-
cpp/src/arrow/util/checked_cast.h | 7 +-
dev/archery/archery/cli.py | 2 +-
python/pyarrow/_dataset.pyx | 199 +++++++++++++--------
python/pyarrow/includes/common.pxd | 5 +
python/pyarrow/includes/libarrow_dataset.pxd | 6 +-
python/pyarrow/io.pxi | 6 +-
python/pyarrow/lib.pxd | 1 +
python/pyarrow/parquet.py | 57 +++---
python/pyarrow/tests/test_parquet.py | 60 +++----
23 files changed, 340 insertions(+), 257 deletions(-)
diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc
b/cpp/examples/arrow/dataset-parquet-scan-example.cc
index 40e1556..ed4b89d 100644
--- a/cpp/examples/arrow/dataset-parquet-scan-example.cc
+++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc
@@ -109,7 +109,8 @@ std::shared_ptr<ds::Dataset> GetDatasetFromFile(
std::string file) {
ds::FileSystemFactoryOptions options;
// The factory will try to build a child dataset.
- auto factory = ds::FileSystemDatasetFactory::Make(fs, {file}, format,
options).ValueOrDie();
+ auto factory =
+ ds::FileSystemDatasetFactory::Make(fs, {file}, format,
options).ValueOrDie();
// Try to infer a common schema for all files.
auto schema = factory->Inspect(conf.inspect_options).ValueOrDie();
@@ -117,7 +118,8 @@ std::shared_ptr<ds::Dataset> GetDatasetFromFile(
// with the previous one, e.g. `factory->Finish(compatible_schema)`.
auto child = factory->Finish(conf.finish_options).ValueOrDie();
- ds::DatasetVector children{conf.repeat, child};
+ ds::DatasetVector children;
+ children.resize(conf.repeat, child);
auto dataset = ds::UnionDataset::Make(std::move(schema),
std::move(children));
return dataset.ValueOrDie();
diff --git a/cpp/src/arrow/dataset/file_base.cc
b/cpp/src/arrow/dataset/file_base.cc
index 8f663bb..a39289d 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -35,24 +35,23 @@ namespace arrow {
namespace dataset {
Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
- if (id() == PATH) {
- return filesystem()->OpenInputFile(path());
+ if (filesystem_) {
+ return filesystem_->OpenInputFile(path_);
}
- return std::make_shared<::arrow::io::BufferReader>(buffer());
-}
-
-Result<std::shared_ptr<arrow::io::OutputStream>> FileSource::OpenWritable()
const {
- if (!writable_) {
- return Status::Invalid("file source '", path(), "' is not writable");
+ if (buffer_) {
+ return std::make_shared<::arrow::io::BufferReader>(buffer_);
}
- if (id() == PATH) {
- return filesystem()->OpenOutputStream(path());
+ return custom_open_();
+}
+
+Result<std::shared_ptr<arrow::io::OutputStream>> WritableFileSource::Open()
const {
+ if (filesystem_) {
+ return filesystem_->OpenOutputStream(path_);
}
- auto b = internal::checked_pointer_cast<ResizableBuffer>(buffer());
- return std::make_shared<::arrow::io::BufferOutputStream>(b);
+ return std::make_shared<::arrow::io::BufferOutputStream>(buffer_);
}
Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(FileSource
source) {
@@ -66,7 +65,7 @@ Result<std::shared_ptr<FileFragment>>
FileFormat::MakeFragment(
}
Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
- FileSource destination, std::shared_ptr<Fragment> fragment,
+ WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context) {
return Status::NotImplemented("writing fragment of format ", type_name());
@@ -168,15 +167,15 @@ Result<std::shared_ptr<FileSystemDataset>>
FileSystemDataset::Write(
const auto& input_fragment = op.fragment();
FileSource dest(path, filesystem);
- ARROW_ASSIGN_OR_RAISE(
- auto fragment,
- plan.format->MakeFragment(dest,
input_fragment->partition_expression()));
- fragments.push_back(std::move(fragment));
+ ARROW_ASSIGN_OR_RAISE(auto write_task,
+ plan.format->WriteFragment({path, filesystem},
input_fragment,
+ scan_options,
scan_context));
+ task_group->Append([write_task] { return write_task->Execute(); });
ARROW_ASSIGN_OR_RAISE(
- auto write_task,
- plan.format->WriteFragment(dest, input_fragment, scan_options,
scan_context));
- task_group->Append([write_task] { return write_task->Execute(); });
+ auto fragment, plan.format->MakeFragment(
+ {path, filesystem},
input_fragment->partition_expression()));
+ fragments.push_back(std::move(fragment));
}
}
diff --git a/cpp/src/arrow/dataset/file_base.h
b/cpp/src/arrow/dataset/file_base.h
index 9d747b4..466ed0f 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -19,6 +19,7 @@
#pragma once
+#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
@@ -44,88 +45,109 @@ namespace dataset {
/// be read like a file
class ARROW_DS_EXPORT FileSource {
public:
- // NOTE(kszucs): it'd be better to separate the BufferSource from FileSource
- enum SourceKind { PATH, BUFFER };
-
FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
- Compression::type compression = Compression::UNCOMPRESSED,
- bool writable = true)
- : source_kind_(PATH),
- path_(std::move(path)),
+ Compression::type compression = Compression::UNCOMPRESSED)
+ : path_(std::move(path)),
filesystem_(std::move(filesystem)),
- compression_(compression),
- writable_(writable) {}
+ compression_(compression) {}
explicit FileSource(std::shared_ptr<Buffer> buffer,
Compression::type compression =
Compression::UNCOMPRESSED)
- : source_kind_(BUFFER), buffer_(std::move(buffer)),
compression_(compression) {}
+ : buffer_(std::move(buffer)), compression_(compression) {}
+
+ using CustomOpen =
std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;
+ explicit FileSource(CustomOpen open) : custom_open_(std::move(open)) {}
- explicit FileSource(std::shared_ptr<ResizableBuffer> buffer,
+ using CustomOpenWithCompression =
+
std::function<Result<std::shared_ptr<io::RandomAccessFile>>(Compression::type)>;
+ explicit FileSource(CustomOpenWithCompression open_with_compression,
Compression::type compression =
Compression::UNCOMPRESSED)
- : source_kind_(BUFFER),
- buffer_(std::move(buffer)),
- compression_(compression),
- writable_(true) {}
-
- bool operator==(const FileSource& other) const {
- if (id() != other.id()) {
- return false;
- }
+ : custom_open_(std::bind(std::move(open_with_compression), compression)),
+ compression_(compression) {}
- if (id() == PATH) {
- return path() == other.path() && filesystem() == other.filesystem();
- }
+ explicit FileSource(std::shared_ptr<io::RandomAccessFile> file,
+ Compression::type compression =
Compression::UNCOMPRESSED)
+ : custom_open_([=] { return ToResult(file); }),
compression_(compression) {}
- return buffer()->Equals(*other.buffer());
- }
+ FileSource() : custom_open_(CustomOpen{&InvalidOpen}) {}
- /// \brief The kind of file, whether stored in a filesystem, memory
- /// resident, or other
- SourceKind id() const { return source_kind_; }
+ static std::vector<FileSource> FromPaths(const
std::shared_ptr<fs::FileSystem>& fs,
+ std::vector<std::string> paths) {
+ std::vector<FileSource> sources;
+ for (auto&& path : paths) {
+ sources.emplace_back(std::move(path), fs);
+ }
+ return sources;
+ }
- /// \brief Return the type of raw compression on the file, if any
+ /// \brief Return the type of raw compression on the file, if any.
Compression::type compression() const { return compression_; }
- /// \brief Whether the this source may be opened writable
- bool writable() const { return writable_; }
-
- /// \brief Return the file path, if any. Only valid when file source
- /// type is PATH
+ /// \brief Return the file path, if any. Only valid when file source wraps a
path.
const std::string& path() const {
static std::string buffer_path = "<Buffer>";
- return id() == PATH ? path_ : buffer_path;
+ static std::string custom_open_path = "<Buffer>";
+ return filesystem_ ? path_ : buffer_ ? buffer_path : custom_open_path;
}
- /// \brief Return the filesystem, if any. Only non null when file
- /// source type is PATH
- const std::shared_ptr<fs::FileSystem>& filesystem() const {
- static std::shared_ptr<fs::FileSystem> no_fs = NULLPTR;
- return id() == PATH ? filesystem_ : no_fs;
- }
+ /// \brief Return the filesystem, if any. Otherwise returns nullptr
+ const std::shared_ptr<fs::FileSystem>& filesystem() const { return
filesystem_; }
- /// \brief Return the buffer containing the file, if any. Only value
- /// when file source type is BUFFER
- const std::shared_ptr<Buffer>& buffer() const {
- static std::shared_ptr<Buffer> path_buffer = NULLPTR;
- return id() == BUFFER ? buffer_ : path_buffer;
- }
+ /// \brief Return the buffer containing the file, if any. Otherwise returns
nullptr
+ const std::shared_ptr<Buffer>& buffer() const { return buffer_; }
/// \brief Get a RandomAccessFile which views this file source
- Result<std::shared_ptr<arrow::io::RandomAccessFile>> Open() const;
-
- /// \brief Get an OutputStream which wraps this file source
- Result<std::shared_ptr<arrow::io::OutputStream>> OpenWritable() const;
+ Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
private:
- SourceKind source_kind_;
+ static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
+ return Status::Invalid("Called Open() on an uninitialized FileSource");
+ }
std::string path_;
std::shared_ptr<fs::FileSystem> filesystem_;
-
std::shared_ptr<Buffer> buffer_;
+ CustomOpen custom_open_;
+ Compression::type compression_ = Compression::UNCOMPRESSED;
+};
+
+/// \brief The path and filesystem where an actual file is located or a buffer
which can
+/// be written to like a file
+class ARROW_DS_EXPORT WritableFileSource {
+ public:
+ WritableFileSource(std::string path, std::shared_ptr<fs::FileSystem>
filesystem,
+ Compression::type compression = Compression::UNCOMPRESSED)
+ : path_(std::move(path)),
+ filesystem_(std::move(filesystem)),
+ compression_(compression) {}
+
+ explicit WritableFileSource(std::shared_ptr<ResizableBuffer> buffer,
+ Compression::type compression =
Compression::UNCOMPRESSED)
+ : buffer_(std::move(buffer)), compression_(compression) {}
+
+ /// \brief Return the type of raw compression on the file, if any
+ Compression::type compression() const { return compression_; }
+
+ /// \brief Return the file path, if any. Only valid when file source wraps a
path.
+ const std::string& path() const {
+ static std::string buffer_path = "<Buffer>";
+ return filesystem_ ? path_ : buffer_path;
+ }
+
+ /// \brief Return the filesystem, if any. Otherwise returns nullptr
+ const std::shared_ptr<fs::FileSystem>& filesystem() const { return
filesystem_; }
- Compression::type compression_;
- bool writable_ = false;
+ /// \brief Return the buffer containing the file, if any. Otherwise returns
nullptr
+ const std::shared_ptr<ResizableBuffer>& buffer() const { return buffer_; }
+
+ /// \brief Get an OutputStream which wraps this file source
+ Result<std::shared_ptr<arrow::io::OutputStream>> Open() const;
+
+ private:
+ std::string path_;
+ std::shared_ptr<fs::FileSystem> filesystem_;
+ std::shared_ptr<ResizableBuffer> buffer_;
+ Compression::type compression_ = Compression::UNCOMPRESSED;
};
/// \brief Base class for file format implementation
@@ -159,7 +181,7 @@ class ARROW_DS_EXPORT FileFormat : public
std::enable_shared_from_this<FileForma
/// \brief Write a fragment. If the parent directory of destination does not
exist, it
/// will be created.
virtual Result<std::shared_ptr<WriteTask>> WriteFragment(
- FileSource destination, std::shared_ptr<Fragment> fragment,
+ WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> scan_context); // FIXME(bkietz) make this
pure virtual
};
@@ -256,16 +278,16 @@ class ARROW_DS_EXPORT WriteTask {
virtual ~WriteTask() = default;
- const FileSource& destination() const;
+ const WritableFileSource& destination() const;
const std::shared_ptr<FileFormat>& format() const { return format_; }
protected:
- WriteTask(FileSource destination, std::shared_ptr<FileFormat> format)
+ WriteTask(WritableFileSource destination, std::shared_ptr<FileFormat> format)
: destination_(std::move(destination)), format_(std::move(format)) {}
Status CreateDestinationParentDir() const;
- FileSource destination_;
+ WritableFileSource destination_;
std::shared_ptr<FileFormat> format_;
};
diff --git a/cpp/src/arrow/dataset/file_ipc.cc
b/cpp/src/arrow/dataset/file_ipc.cc
index 6a509cd..f080a25 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -160,7 +160,7 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
class IpcWriteTask : public WriteTask {
public:
- IpcWriteTask(FileSource destination, std::shared_ptr<FileFormat> format,
+ IpcWriteTask(WritableFileSource destination, std::shared_ptr<FileFormat>
format,
std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context)
@@ -174,7 +174,7 @@ class IpcWriteTask : public WriteTask {
auto schema = scan_options_->schema();
- ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable());
+ ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.Open());
ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewFileWriter(out_stream.get(),
schema));
ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
fragment_->Scan(scan_options_, scan_context_));
@@ -200,7 +200,7 @@ class IpcWriteTask : public WriteTask {
};
Result<std::shared_ptr<WriteTask>> IpcFileFormat::WriteFragment(
- FileSource destination, std::shared_ptr<Fragment> fragment,
+ WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context) {
return std::make_shared<IpcWriteTask>(std::move(destination),
shared_from_this(),
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index eb465bf..70db5e9 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -48,7 +48,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
std::shared_ptr<ScanContext> context)
const override;
Result<std::shared_ptr<WriteTask>> WriteFragment(
- FileSource destination, std::shared_ptr<Fragment> fragment,
+ WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) override;
};
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc
b/cpp/src/arrow/dataset/file_ipc_test.cc
index 827ca5e..747dccf 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -87,10 +87,10 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin {
kBatchRepetitions);
}
- Result<FileSource> GetFileSink() {
+ Result<WritableFileSource> GetFileSink() {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
AllocateResizableBuffer(0));
- return FileSource(std::move(buffer));
+ return WritableFileSource(std::move(buffer));
}
RecordBatchIterator Batches(ScanTaskIterator scan_task_it) {
diff --git a/cpp/src/arrow/dataset/file_test.cc
b/cpp/src/arrow/dataset/file_test.cc
index 8651c2d..a2d3add 100644
--- a/cpp/src/arrow/dataset/file_test.cc
+++ b/cpp/src/arrow/dataset/file_test.cc
@@ -49,17 +49,17 @@ TEST(FileSource, PathBased) {
ASSERT_EQ(p1, source1.path());
ASSERT_TRUE(localfs->Equals(*source1.filesystem()));
- ASSERT_EQ(FileSource::PATH, source1.id());
ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
ASSERT_EQ(p2, source2.path());
ASSERT_TRUE(localfs->Equals(*source2.filesystem()));
- ASSERT_EQ(FileSource::PATH, source2.id());
ASSERT_EQ(Compression::GZIP, source2.compression());
// Test copy constructor and comparison
- FileSource source3 = source1;
- ASSERT_EQ(source1, source3);
+ FileSource source3;
+ source3 = source1;
+ ASSERT_EQ(source1.path(), source3.path());
+ ASSERT_EQ(source1.filesystem(), source3.filesystem());
}
TEST(FileSource, BufferBased) {
@@ -69,13 +69,15 @@ TEST(FileSource, BufferBased) {
FileSource source1(buf);
FileSource source2(buf, Compression::LZ4);
- ASSERT_EQ(FileSource::BUFFER, source1.id());
ASSERT_TRUE(source1.buffer()->Equals(*buf));
ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());
- ASSERT_EQ(FileSource::BUFFER, source2.id());
ASSERT_TRUE(source2.buffer()->Equals(*buf));
ASSERT_EQ(Compression::LZ4, source2.compression());
+
+ FileSource source3;
+ source3 = source1;
+ ASSERT_EQ(source1.buffer(), source3.buffer());
}
TEST_F(TestFileSystemDataset, Basic) {
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index a0fd3ff..c228f71 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -131,7 +131,7 @@ class FinishableStream {
std::shared_ptr<Stream> stream() const { return stream_; }
/// \brief Finish the call, adding server context to the given status.
- virtual Status Finish(Status&& st) {
+ virtual Status Finish(Status st) {
if (finished_) {
return MergeStatus(std::move(st));
}
@@ -202,7 +202,7 @@ class FinishableWritableStream : public
FinishableStream<Stream, ReadT> {
return Status::OK();
}
- Status Finish(Status&& st) override {
+ Status Finish(Status st) override {
// This may be used concurrently by reader/writer side of a
// stream, so it needs to be protected.
std::lock_guard<std::mutex> guard(finish_mutex_);
@@ -223,14 +223,14 @@ class FinishableWritableStream : public
FinishableStream<Stream, ReadT> {
bool finished_writes = done_writing_ || this->stream()->WritesDone();
done_writing_ = true;
- Status result = FinishableStream<Stream, ReadT>::Finish(std::move(st));
+ st = FinishableStream<Stream, ReadT>::Finish(std::move(st));
if (!finished_writes) {
return Status::FromDetailAndArgs(
- result.code(), result.detail(), result.message(),
+ st.code(), st.detail(), st.message(),
". Additionally, could not finish writing record batches before
closing");
}
- return result;
+ return st;
}
private:
@@ -444,7 +444,7 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
// Validate IPC message
auto result = data->OpenMessage();
if (!result.ok()) {
- return stream_->Finish(result.status());
+ return stream_->Finish(std::move(result).status());
}
*app_metadata_ = std::move(data->app_metadata);
return result;
diff --git a/cpp/src/arrow/python/common.cc b/cpp/src/arrow/python/common.cc
index 5b7b36e..8ff3485 100644
--- a/cpp/src/arrow/python/common.cc
+++ b/cpp/src/arrow/python/common.cc
@@ -152,13 +152,6 @@ Status ConvertPyError(StatusCode code) {
return Status(code, message, detail);
}
-Status PassPyError() {
- if (PyErr_Occurred()) {
- return ConvertPyError();
- }
- return Status::OK();
-}
-
bool IsPyError(const Status& status) {
if (status.ok()) {
return false;
diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h
index e2757ed..c38a6f8 100644
--- a/cpp/src/arrow/python/common.h
+++ b/cpp/src/arrow/python/common.h
@@ -38,8 +38,6 @@ namespace py {
// Convert current Python error to a Status. The Python error state is cleared
// and can be restored with RestorePyError().
ARROW_PYTHON_EXPORT Status ConvertPyError(StatusCode code =
StatusCode::UnknownError);
-// Same as ConvertPyError(), but returns Status::OK() if no Python error is
set.
-ARROW_PYTHON_EXPORT Status PassPyError();
// Query whether the given Status is a Python error (as wrapped by
ConvertPyError()).
ARROW_PYTHON_EXPORT bool IsPyError(const Status& status);
// Restore a Python error wrapped in a Status.
@@ -55,9 +53,9 @@ inline Status CheckPyError(StatusCode code =
StatusCode::UnknownError) {
}
}
-#define RETURN_IF_PYERROR() ARROW_RETURN_NOT_OK(CheckPyError());
+#define RETURN_IF_PYERROR() ARROW_RETURN_NOT_OK(CheckPyError())
-#define PY_RETURN_IF_ERROR(CODE) ARROW_RETURN_NOT_OK(CheckPyError(CODE));
+#define PY_RETURN_IF_ERROR(CODE) ARROW_RETURN_NOT_OK(CheckPyError(CODE))
// For Cython, as you can't define template C++ functions in Cython, only use
them.
// This function can set a Python exception. It assumes that T has a (cheap)
diff --git a/cpp/src/arrow/python/numpy_convert.cc
b/cpp/src/arrow/python/numpy_convert.cc
index cd8da8d..85c5f33 100644
--- a/cpp/src/arrow/python/numpy_convert.cc
+++ b/cpp/src/arrow/python/numpy_convert.cc
@@ -277,7 +277,7 @@ Status TensorToNdarray(const std::shared_ptr<Tensor>&
tensor, PyObject* base,
PyObject* result =
PyArray_NewFromDescr(&PyArray_Type, dtype, ndim, npy_shape.data(),
npy_strides.data(), mutable_data, array_flags,
nullptr);
- RETURN_IF_PYERROR()
+ RETURN_IF_PYERROR();
if (base == Py_None || base == nullptr) {
base = py::wrap_tensor(tensor);
@@ -309,7 +309,7 @@ static Status SparseTensorDataToNdarray(const SparseTensor&
sparse_tensor,
*out_data = PyArray_NewFromDescr(&PyArray_Type, dtype_data,
static_cast<int>(data_shape.size()),
data_shape.data(),
nullptr, mutable_data, array_flags,
nullptr);
- RETURN_IF_PYERROR()
+ RETURN_IF_PYERROR();
Py_XINCREF(base);
PyArray_SetBaseObject(reinterpret_cast<PyArrayObject*>(*out_data), base);
return Status::OK();
diff --git a/cpp/src/arrow/python/serialize.cc
b/cpp/src/arrow/python/serialize.cc
index 9d10041..a4645b7 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -371,7 +371,7 @@ Status CallCustomCallback(PyObject* context, PyObject*
method_name, PyObject* el
": handler not registered");
} else {
*result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL);
- return PassPyError();
+ return CheckPyError();
}
}
diff --git a/cpp/src/arrow/result.h b/cpp/src/arrow/result.h
index b492057..96863ec 100644
--- a/cpp/src/arrow/result.h
+++ b/cpp/src/arrow/result.h
@@ -298,7 +298,7 @@ class ARROW_MUST_USE_TYPE Result : public
util::EqualityComparable<Result<T>> {
///
/// \return The stored non-OK status object, or an OK status if this object
/// has a value.
- Status status() const { return status_; }
+ const Status& status() const { return status_; }
/// Gets the stored `T` value.
///
@@ -462,4 +462,9 @@ inline Status GenericToStatus(Result<T>&& res) {
} // namespace internal
+template <typename T>
+Result<T> ToResult(T t) {
+ return Result<T>(std::move(t));
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 33ca303..36f1b90 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -306,8 +306,9 @@ class ARROW_MUST_USE_TYPE ARROW_EXPORT Status : public
util::EqualityComparable<
std::string message() const { return ok() ? "" : state_->msg; }
/// \brief Return the status detail attached to this message.
- std::shared_ptr<StatusDetail> detail() const {
- return state_ == NULLPTR ? NULLPTR : state_->detail;
+ const std::shared_ptr<StatusDetail>& detail() const {
+ static std::shared_ptr<StatusDetail> no_detail = NULLPTR;
+ return state_ ? state_->detail : no_detail;
}
/// \brief Return a new Status copying the existing status, but
diff --git a/cpp/src/arrow/util/checked_cast.h
b/cpp/src/arrow/util/checked_cast.h
index 3692bd9..97f6b61 100644
--- a/cpp/src/arrow/util/checked_cast.h
+++ b/cpp/src/arrow/util/checked_cast.h
@@ -19,6 +19,7 @@
#include <memory>
#include <type_traits>
+#include <utility>
namespace arrow {
namespace internal {
@@ -39,11 +40,11 @@ inline OutputType checked_cast(InputType&& value) {
}
template <class T, class U>
-std::shared_ptr<T> checked_pointer_cast(const std::shared_ptr<U>& r) noexcept {
+std::shared_ptr<T> checked_pointer_cast(std::shared_ptr<U> r) noexcept {
#ifdef NDEBUG
- return std::static_pointer_cast<T>(r);
+ return std::static_pointer_cast<T>(std::move(r));
#else
- return std::dynamic_pointer_cast<T>(r);
+ return std::dynamic_pointer_cast<T>(std::move(r));
#endif
}
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index 1cbc18e..82ab56a 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -714,7 +714,7 @@ def docker_compose_run(obj, image, command, *, env, user,
force_pull,
dry_run, volume):
"""Execute docker-compose builds.
- To see the available builds run `archery docker list`.
+ To see the available builds run `archery docker images`.
Examples:
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index acb88e2..f4997a9 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -29,6 +29,7 @@ from pyarrow.includes.libarrow_dataset cimport *
from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
from pyarrow._csv cimport ParseOptions
from pyarrow._compute cimport CastOptions
+from pyarrow.util import _is_path_like, _stringify_path
def _forbid_instantiation(klass, subclasses_instead=True):
@@ -43,6 +44,51 @@ def _forbid_instantiation(klass, subclasses_instead=True):
raise TypeError(msg)
+cdef class FileSource:
+
+ cdef:
+ CFileSource wrapped
+
+ def __cinit__(self, file, FileSystem filesystem=None):
+ cdef:
+ shared_ptr[CFileSystem] c_filesystem
+ c_string c_path
+ shared_ptr[CRandomAccessFile] c_file
+ shared_ptr[CBuffer] c_buffer
+
+ if isinstance(file, FileSource):
+ self.wrapped = (<FileSource> file).wrapped
+
+ elif isinstance(file, Buffer):
+ c_buffer = pyarrow_unwrap_buffer(file)
+ self.wrapped = CFileSource(move(c_buffer))
+
+ elif _is_path_like(file):
+ if filesystem is None:
+ raise ValueError("cannot construct a FileSource from "
+ "a path without a FileSystem")
+ c_filesystem = filesystem.unwrap()
+ c_path = tobytes(_stringify_path(file))
+ self.wrapped = CFileSource(move(c_path), move(c_filesystem))
+
+ elif hasattr(file, 'read'):
+ # Optimistically hope this is file-like
+ c_file = get_native_file(file, False).get_random_access_file()
+ self.wrapped = CFileSource(move(c_file))
+
+ else:
+ raise TypeError("cannot construct a FileSource "
+ "from " + str(file))
+
+ @staticmethod
+ def from_uri(uri):
+ filesystem, path = FileSystem.from_uri(uri)
+ return FileSource(path, filesystem)
+
+ cdef CFileSource unwrap(self) nogil:
+ return self.wrapped
+
+
cdef class Expression:
cdef:
@@ -226,16 +272,18 @@ cdef class Dataset:
@staticmethod
cdef wrap(const shared_ptr[CDataset]& sp):
- cdef Dataset self
+ type_name = frombytes(sp.get().type_name())
- typ = frombytes(sp.get().type_name())
- if typ == 'union':
- self = UnionDataset.__new__(UnionDataset)
- elif typ == 'filesystem':
- self = FileSystemDataset.__new__(FileSystemDataset)
- else:
- raise TypeError(typ)
+ classes = {
+ 'union': UnionDataset,
+ 'filesystem': FileSystemDataset,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+ cdef Dataset self = class_.__new__(class_)
self.init(sp)
return self
@@ -411,7 +459,7 @@ cdef class FileSystemDataset(Dataset):
Parameters
----------
- paths_or_selector : Union[FileSelector, List[FileInfo]]
+ paths_or_selector : Union[FileSelector, List[str]]
List of files/directories to consume.
schema : Schema
The top-level schema of the DataDataset.
@@ -457,8 +505,7 @@ cdef class FileSystemDataset(Dataset):
infos = filesystem.get_file_info(paths_or_selector)
- if partitions is None:
- partitions = [_true] * len(infos)
+ partitions = partitions or [_true] * len(infos)
if len(infos) != len(partitions):
raise ValueError(
@@ -531,50 +578,45 @@ cdef class FileFormat:
@staticmethod
cdef wrap(const shared_ptr[CFileFormat]& sp):
- cdef FileFormat self
-
- typ = frombytes(sp.get().type_name())
- if typ == 'parquet':
- self = ParquetFileFormat.__new__(ParquetFileFormat)
- elif typ == 'ipc':
- self = IpcFileFormat.__new__(IpcFileFormat)
- elif typ == 'csv':
- self = CsvFileFormat.__new__(CsvFileFormat)
- else:
- raise TypeError(typ)
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'ipc': IpcFileFormat,
+ 'csv': CsvFileFormat,
+ 'parquet': ParquetFileFormat,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+ cdef FileFormat self = class_.__new__(class_)
self.init(sp)
return self
cdef inline shared_ptr[CFileFormat] unwrap(self):
return self.wrapped
- def inspect(self, str path not None, FileSystem filesystem not None):
+ def inspect(self, file, filesystem=None):
"""Infer the schema of a file."""
- cdef:
- shared_ptr[CSchema] c_schema
-
- c_schema = GetResultValue(self.format.Inspect(CFileSource(
- tobytes(path), filesystem.unwrap())))
+ c_source = FileSource(file, filesystem).unwrap()
+ c_schema = GetResultValue(self.format.Inspect(c_source))
return pyarrow_wrap_schema(move(c_schema))
- def make_fragment(self, str path not None, FileSystem filesystem not None,
+ def make_fragment(self, file, filesystem=None,
Expression partition_expression=None):
"""
Make a FileFragment of this FileFormat. The filter may not reference
fields absent from the provided schema. If no schema is provided then
one will be inferred.
"""
- cdef:
- shared_ptr[CFileFragment] c_fragment
-
partition_expression = partition_expression or _true
- c_fragment = GetResultValue(
- self.format.MakeFragment(CFileSource(tobytes(path),
- filesystem.unwrap()),
+ c_source = FileSource(file, filesystem).unwrap()
+ c_fragment = <shared_ptr[CFragment]> GetResultValue(
+ self.format.MakeFragment(move(c_source),
partition_expression.unwrap()))
- return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+ return Fragment.wrap(move(c_fragment))
def __eq__(self, other):
try:
@@ -590,26 +632,30 @@ cdef class Fragment:
shared_ptr[CFragment] wrapped
CFragment* fragment
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
cdef void init(self, const shared_ptr[CFragment]& sp):
self.wrapped = sp
self.fragment = sp.get()
@staticmethod
cdef wrap(const shared_ptr[CFragment]& sp):
- # there's no discriminant in Fragment, so we can't downcast
- # to FileFragment for the path property
- cdef Fragment self = Fragment()
-
- typ = frombytes(sp.get().type_name())
- if typ == 'ipc':
- # IpcFileFormat does not have a corresponding subclass
- # of FileFragment
- self = FileFragment.__new__(FileFragment)
- elif typ == 'parquet':
- self = ParquetFileFragment.__new__(ParquetFileFragment)
- else:
- self = Fragment()
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ # IpcFileFormat and CsvFileFormat do not have corresponding
+ # subclasses of FileFragment
+ 'ipc': FileFragment,
+ 'csv': FileFragment,
+ 'parquet': ParquetFileFragment,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ class_ = Fragment
+ cdef Fragment self = class_.__new__(class_)
self.init(sp)
return self
@@ -732,9 +778,13 @@ cdef class FileFragment(Fragment):
it views a file. If instead it views a buffer, this will be None.
"""
cdef:
- shared_ptr[CFileSystem] fs
- fs = self.file_fragment.source().filesystem()
- return FileSystem.wrap(fs)
+ shared_ptr[CFileSystem] c_fs
+ c_fs = self.file_fragment.source().filesystem()
+
+ if c_fs.get() == nullptr:
+ return None
+
+ return FileSystem.wrap(c_fs)
@property
def buffer(self):
@@ -941,27 +991,25 @@ cdef class ParquetFileFormat(FileFormat):
def __reduce__(self):
return ParquetFileFormat, (self.read_options,)
- def make_fragment(self, str path not None, FileSystem filesystem not None,
+ def make_fragment(self, file, filesystem=None,
Expression partition_expression=None, row_groups=None):
cdef:
- shared_ptr[CFileFragment] c_fragment
vector[int] c_row_groups
partition_expression = partition_expression or _true
if row_groups is None:
- return super().make_fragment(path, filesystem,
+ return super().make_fragment(file, filesystem,
partition_expression)
- for row_group in set(row_groups):
- c_row_groups.push_back(<int> row_group)
+ c_source = FileSource(file, filesystem).unwrap()
+ c_row_groups = [<int> row_group for row_group in set(row_groups)]
- c_fragment = GetResultValue(
- self.parquet_format.MakeFragment(CFileSource(tobytes(path),
- filesystem.unwrap()),
+ c_fragment = <shared_ptr[CFragment]> GetResultValue(
+ self.parquet_format.MakeFragment(move(c_source),
partition_expression.unwrap(),
move(c_row_groups)))
- return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+ return Fragment.wrap(move(c_fragment))
cdef class IpcFileFormat(FileFormat):
@@ -1019,16 +1067,18 @@ cdef class Partitioning:
@staticmethod
cdef wrap(const shared_ptr[CPartitioning]& sp):
- cdef Partitioning self
+ type_name = frombytes(sp.get().type_name())
- typ = frombytes(sp.get().type_name())
- if typ == 'schema':
- self = DirectoryPartitioning.__new__(DirectoryPartitioning)
- elif typ == 'hive':
- self = HivePartitioning.__new__(HivePartitioning)
- else:
- raise TypeError(typ)
+ classes = {
+ 'schema': DirectoryPartitioning,
+ 'hive': HivePartitioning,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+ cdef Partitioning self = class_.__new__(class_)
self.init(sp)
return self
@@ -1430,24 +1480,23 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
FileSystemFactoryOptions options=None):
cdef:
vector[c_string] paths
- CFileSelector selector
+ CFileSelector c_selector
CResult[shared_ptr[CDatasetFactory]] result
shared_ptr[CFileSystem] c_filesystem
shared_ptr[CFileFormat] c_format
CFileSystemFactoryOptions c_options
- c_filesystem = filesystem.unwrap()
- c_format = format.unwrap()
-
options = options or FileSystemFactoryOptions()
c_options = options.unwrap()
+ c_filesystem = filesystem.unwrap()
+ c_format = format.unwrap()
if isinstance(paths_or_selector, FileSelector):
with nogil:
- selector = (<FileSelector>paths_or_selector).selector
+ c_selector = (<FileSelector> paths_or_selector).selector
result = CFileSystemDatasetFactory.MakeFromSelector(
c_filesystem,
- selector,
+ c_selector,
c_format,
c_options
)
diff --git a/python/pyarrow/includes/common.pxd
b/python/pyarrow/includes/common.pxd
index d8ee866..c2ef569 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -116,13 +116,18 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef extern from "arrow/result.h" namespace "arrow" nogil:
cdef cppclass CResult "arrow::Result"[T]:
+ CResult()
+ CResult(CStatus)
+ CResult(T)
c_bool ok()
CStatus status()
T operator*()
+
cdef extern from "arrow/python/common.h" namespace "arrow::py" nogil:
T GetResultValue[T](CResult[T]) except *
+
cdef inline object PyObject_to_object(PyObject* o):
# Cast to "object" increments reference count
cdef object result = <object> o
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index 56140bd..e84ff78 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -195,7 +195,11 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
const c_string& path() const
const shared_ptr[CFileSystem]& filesystem() const
const shared_ptr[CBuffer]& buffer() const
- CFileSource(c_string path, shared_ptr[CFileSystem] filesystem)
+ # HACK: Cython can't handle all the overloads so don't declare them.
+ # This means invalid construction of CFileSource won't be caught in
+ # the C++ generation phase (though it will still be caught when
+ # the generated C++ is compiled).
+ CFileSource(...)
cdef cppclass CFileFormat "arrow::dataset::FileFormat":
c_string type_name() const
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 53cb5de..77ac4f8 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1376,7 +1376,7 @@ cdef shared_ptr[CBuffer] as_c_buffer(object o) except *:
return buf
-cdef NativeFile _get_native_file(object source, c_bool use_memory_map):
+cdef NativeFile get_native_file(object source, c_bool use_memory_map):
try:
source_path = _stringify_path(source)
except TypeError:
@@ -1398,7 +1398,7 @@ cdef get_reader(object source, c_bool use_memory_map,
shared_ptr[CRandomAccessFile]* reader):
cdef NativeFile nf
- nf = _get_native_file(source, use_memory_map)
+ nf = get_native_file(source, use_memory_map)
reader[0] = nf.get_random_access_file()
@@ -1418,7 +1418,7 @@ cdef get_input_stream(object source, c_bool
use_memory_map,
except TypeError:
codec = None
- nf = _get_native_file(source, use_memory_map)
+ nf = get_native_file(source, use_memory_map)
input_stream = nf.get_input_stream()
# codec is None if compression can't be detected
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8b757d5..b4cd230 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -579,6 +579,7 @@ cdef get_input_stream(object source, c_bool use_memory_map,
cdef get_reader(object source, c_bool use_memory_map,
shared_ptr[CRandomAccessFile]* reader)
cdef get_writer(object source, shared_ptr[COutputStream]* writer)
+cdef NativeFile get_native_file(object source, c_bool use_memory_map)
# Default is allow_none=False
cpdef DataType ensure_type(object type, bint allow_none=*)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index ef5c3be..c581e0f 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1390,6 +1390,26 @@ class _ParquetDatasetV2:
"Keyword '{0}' is not yet supported with the new "
"Dataset API".format(keyword))
+ # map format arguments
+ read_options = {}
+ if buffer_size:
+ read_options.update(use_buffered_stream=True,
+ buffer_size=buffer_size)
+ if read_dictionary is not None:
+ read_options.update(dictionary_columns=read_dictionary)
+ parquet_format = ds.ParquetFileFormat(read_options=read_options)
+
+ # map filters to Expressions
+ self._filters = filters
+ self._filter_expression = filters and _filters_to_expression(filters)
+
+ # check for single NativeFile dataset
+ if not isinstance(path_or_paths, list):
+ if not _is_path_like(path_or_paths):
+ self._fragment = parquet_format.make_fragment(path_or_paths)
+ self._dataset = None
+ return
+
# map old filesystems to new one
# TODO(dataset) deal with other file systems
if isinstance(filesystem, LocalFileSystem):
@@ -1399,27 +1419,16 @@ class _ParquetDatasetV2:
# path can in principle be URI for any filesystem)
filesystem = pyarrow.fs.LocalFileSystem(use_mmap=True)
- # map additional arguments
- read_options = {}
- if buffer_size:
- read_options.update(use_buffered_stream=True,
- buffer_size=buffer_size)
- if read_dictionary is not None:
- read_options.update(dictionary_columns=read_dictionary)
- parquet_format = ds.ParquetFileFormat(read_options=read_options)
-
self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
format=parquet_format,
partitioning=partitioning)
- self._filters = filters
- if filters is not None:
- self._filter_expression = _filters_to_expression(filters)
- else:
- self._filter_expression = None
+ self._fragment = None
@property
def schema(self):
- return self._dataset.schema
+ if self._dataset is not None:
+ return self._dataset.schema
+ return self._fragment.physical_schema
def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
@@ -1444,13 +1453,17 @@ class _ParquetDatasetV2:
"""
# if use_pandas_metadata, we need to include index columns in the
# column selection, to be able to restore those in the pandas DataFrame
- metadata = self._dataset.schema.metadata
+ metadata = self.schema.metadata
if columns is not None and use_pandas_metadata:
if metadata and b'pandas' in metadata:
- index_columns = set(_get_pandas_index_columns(metadata))
- columns = columns + list(index_columns - set(columns))
-
- table = self._dataset.to_table(
+ # RangeIndex can be represented as dict instead of column name
+ index_columns = [
+ col for col in _get_pandas_index_columns(metadata)
+ if not isinstance(col, dict)
+ ]
+ columns = columns + list(set(index_columns) - set(columns))
+
+ table = (self._dataset or self._fragment).to_table(
columns=columns, filter=self._filter_expression,
use_threads=use_threads
)
@@ -1521,10 +1534,6 @@ def read_table(source, columns=None, use_threads=True,
metadata=None,
read_dictionary=None, filesystem=None, filters=None,
buffer_size=0, partitioning="hive", use_legacy_dataset=True):
if not use_legacy_dataset:
- if not _is_path_like(source):
- raise ValueError("File-like objects are not yet supported with "
- "the new Dataset API")
-
dataset = _ParquetDatasetV2(
source,
filesystem=filesystem,
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 52b44ba..cd86df2 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -89,18 +89,11 @@ def _roundtrip_table(table, read_table_kwargs=None,
read_table_kwargs = read_table_kwargs or {}
write_table_kwargs = write_table_kwargs or {}
- if use_legacy_dataset:
- buf = io.BytesIO()
- _write_table(table, buf, **write_table_kwargs)
- buf.seek(0)
- return _read_table(buf, **read_table_kwargs)
- else:
- from pyarrow.fs import _MockFileSystem
- mockfs = _MockFileSystem()
- with mockfs.open_output_stream("test") as out:
- _write_table(table, out, **write_table_kwargs)
- return _read_table("test", filesystem=mockfs, use_legacy_dataset=False,
- **read_table_kwargs)
+ buf = io.BytesIO()
+ _write_table(table, buf, **write_table_kwargs)
+ buf.seek(0)
+ return _read_table(buf, use_legacy_dataset=use_legacy_dataset,
+ **read_table_kwargs)
def _check_roundtrip(table, expected=None, read_table_kwargs=None,
@@ -349,7 +342,7 @@ def
test_nested_list_nonnullable_roundtrip_bug(use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_pandas_parquet_datetime_tz(use_legacy_dataset):
s = pd.Series([datetime.datetime(2017, 9, 6)])
s = s.dt.tz_localize('utc')
@@ -368,7 +361,7 @@ def test_pandas_parquet_datetime_tz(use_legacy_dataset):
_write_table(arrow_table, f, coerce_timestamps='ms')
f.seek(0)
- table_read = pq.read_pandas(f)
+ table_read = pq.read_pandas(f, use_legacy_dataset=use_legacy_dataset)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@@ -572,9 +565,8 @@ def _test_dataframe(size=10000, seed=0):
return df
-# TODO(ARROW-8074) NativeFile support
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_pandas_parquet_native_file_roundtrip(tempdir, use_legacy_dataset):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
@@ -588,7 +580,7 @@ def test_pandas_parquet_native_file_roundtrip(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_parquet_incremental_file_build(tempdir, use_legacy_dataset):
df = _test_dataframe(100)
df['unique_id'] = 0
@@ -617,7 +609,7 @@ def test_parquet_incremental_file_build(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_read_pandas_column_subset(tempdir, use_legacy_dataset):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
@@ -633,7 +625,7 @@ def test_read_pandas_column_subset(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_pandas_parquet_empty_roundtrip(tempdir, use_legacy_dataset):
df = _test_dataframe(0)
arrow_table = pa.Table.from_pandas(df)
@@ -672,7 +664,7 @@ def test_pandas_can_write_nested_data(tempdir):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset):
filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
size = 5
@@ -1459,7 +1451,7 @@ def test_fixed_size_binary():
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_multithreaded_read(use_legacy_dataset):
df = alltypes_sample(size=10000)
@@ -1480,7 +1472,7 @@ def test_multithreaded_read(use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_min_chunksize(use_legacy_dataset):
data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
table = pa.Table.from_pandas(data.reset_index())
@@ -3325,7 +3317,7 @@ def test_decimal_roundtrip_negative_scale(tempdir):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_parquet_writer_context_obj(tempdir, use_legacy_dataset):
df = _test_dataframe(100)
df['unique_id'] = 0
@@ -3352,7 +3344,7 @@ def test_parquet_writer_context_obj(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_parquet_writer_context_obj_with_exception(
tempdir, use_legacy_dataset
):
@@ -3388,7 +3380,7 @@ def test_parquet_writer_context_obj_with_exception(
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_zlib_compression_bug(use_legacy_dataset):
# ARROW-3514: "zlib deflate failed, output buffer too small"
table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
@@ -3448,7 +3440,7 @@ def test_empty_row_groups(tempdir):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_parquet_writer_with_caller_provided_filesystem(use_legacy_dataset):
out = pa.BufferOutputStream()
@@ -3558,7 +3550,7 @@ def test_read_column_invalid_index():
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_direct_read_dictionary(use_legacy_dataset):
# ARROW-3325
repeats = 10
@@ -3610,7 +3602,7 @@ def test_dataset_read_dictionary(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset_not_supported # ARROW-8799
def test_direct_read_dictionary_subfield(use_legacy_dataset):
repeats = 10
nunique = 5
@@ -3726,7 +3718,7 @@ def test_parquet_file_too_small(tempdir,
use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_categorical_index_survives_roundtrip(use_legacy_dataset):
# ARROW-3652, addressed by ARROW-3246
df = pd.DataFrame([['a', 'b'], ['c', 'd']], columns=['c1', 'c2'])
@@ -3743,7 +3735,7 @@ def
test_categorical_index_survives_roundtrip(use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_categorical_order_survives_roundtrip(use_legacy_dataset):
# ARROW-6302
df = pd.DataFrame({"a": pd.Categorical(
@@ -3767,7 +3759,7 @@ def _simple_table_write_read(table):
return pq.read_table(pa.BufferReader(contents))
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_dictionary_array_automatically_read(use_legacy_dataset):
# ARROW-3246
@@ -3833,7 +3825,7 @@ def test_field_id_metadata():
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
# ARROW-5085
df = pd.DataFrame({"col": [None] * 100, "int": [1.0] * 100})
@@ -3853,7 +3845,7 @@ def
test_pandas_categorical_na_type_row_groups(use_legacy_dataset):
@pytest.mark.pandas
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
def test_pandas_categorical_roundtrip(use_legacy_dataset):
# ARROW-5480, this was enabled by ARROW-3246
@@ -4021,7 +4013,7 @@ def test_table_large_metadata():
_check_roundtrip(table)
-@parametrize_legacy_dataset_skip_buffer
+@parametrize_legacy_dataset
@pytest.mark.parametrize('array_factory', [
lambda: pa.array([0, None] * 10),
lambda: pa.array([0, None] * 10).dictionary_encode(),