This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7b5912deea ARROW-14892: [Python][C++] GCS Bindings (#12763)
7b5912deea is described below
commit 7b5912deea1c0fc8e2ac801b99d2448f387fa98e
Author: emkornfield <[email protected]>
AuthorDate: Sun Jun 12 02:50:28 2022 -0700
ARROW-14892: [Python][C++] GCS Bindings (#12763)
Incorporate GCS file system into python and other bug fixes.
Bugs/Other changes:
- Add GCS bindings mostly based on AWS bindings in Python and associated
unit tests
- Tell was incorrect, it double counted when the stream was constructed
with an offset.
- Missed setting the define in config.cmake which means `FileSystemFromUri
was never tested and didn't compile this is now fixed`
- Refine logic for GetFileInfo with a single path to recognize prefixes
followed by a slash as a directory. This allows datasets to work as expected
with a toy dataset generated on local-filesystem and copied to the cloud (I
believe this is typical of how other systems write to GCS as well.
- Switch convention for creating directories to always end in "/" and make
use of this as another indicator. From testing with a sample iceberg table it
appears this is the convention used for hive-partitioning, so I assume this is
common practice for other Hive related writers (i.e. what we want to support).
- Fix bug introduced in
https://github.com/apache/arrow/commit/a5e45cecb24229433b825dac64e0ffd10d400e8c
which caused failures when a deletion occurred on a bucket (not an object in
the bucket).
- Ensure output streams are closed on destruction (this is consistent with
S3)
Lead-authored-by: Micah Kornfield <[email protected]>
Co-authored-by: emkornfield <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
.github/workflows/python.yml | 1 +
.travis.yml | 7 +-
appveyor.yml | 1 +
ci/docker/conda-python.dockerfile | 5 +
ci/docker/debian-11-cpp.dockerfile | 2 +
ci/docker/fedora-35-cpp.dockerfile | 2 +
ci/docker/linux-apt-docs.dockerfile | 1 +
ci/docker/python-wheel-manylinux-test.dockerfile | 3 +
ci/docker/ubuntu-20.04-cpp.dockerfile | 1 +
ci/docker/ubuntu-22.04-cpp.dockerfile | 2 +
ci/scripts/install_gcs_testbench.sh | 22 ++-
ci/scripts/python_build.sh | 1 +
ci/scripts/python_test.sh | 2 +
ci/scripts/python_wheel_macos_build.sh | 3 +-
ci/scripts/python_wheel_manylinux_build.sh | 3 +-
ci/scripts/python_wheel_unix_test.sh | 15 +-
cpp/src/arrow/filesystem/api.h | 7 +-
cpp/src/arrow/filesystem/filesystem.cc | 3 +-
cpp/src/arrow/filesystem/filesystem.h | 6 +-
cpp/src/arrow/filesystem/gcsfs.cc | 199 +++++++++++++++------
cpp/src/arrow/filesystem/gcsfs.h | 41 ++++-
cpp/src/arrow/filesystem/gcsfs_internal.cc | 5 +-
cpp/src/arrow/filesystem/gcsfs_test.cc | 119 ++++++++++--
cpp/src/arrow/util/config.h.cmake | 1 +
dev/archery/archery/cli.py | 2 +
dev/archery/archery/lang/cpp.py | 7 +-
dev/release/verify-release-candidate.sh | 13 +-
dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat | 1 +
dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh | 1 +
dev/tasks/homebrew-formulae/apache-arrow.rb | 2 +-
dev/tasks/python-wheels/github.osx.amd64.yml | 4 +-
dev/tasks/python-wheels/github.osx.arm64.yml | 4 +
dev/tasks/tasks.yml | 7 +-
python/CMakeLists.txt | 4 +
python/asv-build.sh | 2 +
python/pyarrow/_fs.pyx | 3 +
python/pyarrow/_gcsfs.pyx | 188 +++++++++++++++++++
python/pyarrow/conftest.py | 9 +
python/pyarrow/fs.py | 5 +
python/pyarrow/includes/libarrow.pxd | 7 +
python/pyarrow/includes/libarrow_fs.pxd | 34 ++++
python/pyarrow/tests/conftest.py | 21 +++
python/pyarrow/tests/test_fs.py | 95 +++++++++-
python/pyarrow/tests/test_pandas.py | 4 +-
python/pyarrow/types.pxi | 2 +-
python/setup.py | 8 +
46 files changed, 772 insertions(+), 103 deletions(-)
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index b14559d12a..c3c6b15186 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -122,6 +122,7 @@ jobs:
ARROW_DATASET: ON
ARROW_FLIGHT: ON
ARROW_GANDIVA: ON
+ ARROW_GCS: OFF
ARROW_HDFS: ON
ARROW_JEMALLOC: ON
ARROW_ORC: ON
diff --git a/.travis.yml b/.travis.yml
index f906ba8686..b3aa724107 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -60,7 +60,6 @@ jobs:
DOCKER_RUN_ARGS: >-
"
-e ARROW_BUILD_STATIC=OFF
- -e ARROW_GCS=OFF
-e ARROW_ORC=OFF
-e ARROW_USE_GLOG=OFF
-e CMAKE_UNITY_BUILD=ON
@@ -99,11 +98,11 @@ jobs:
-e ARROW_GCS=OFF
-e ARROW_MIMALLOC=OFF
-e ARROW_ORC=OFF
- -e ARROW_SUBSTRAIT=OFF
-e ARROW_PARQUET=OFF
-e ARROW_S3=OFF
- -e CMAKE_UNITY_BUILD=ON
+ -e ARROW_SUBSTRAIT=OFF
-e CMAKE_BUILD_PARALLEL_LEVEL=2
+ -e CMAKE_UNITY_BUILD=ON
-e PARQUET_BUILD_EXAMPLES=OFF
-e PARQUET_BUILD_EXECUTABLES=OFF
-e Protobuf_SOURCE=BUNDLED
@@ -154,8 +153,8 @@ jobs:
-e ARROW_PARQUET=OFF
-e ARROW_PYTHON=ON
-e ARROW_S3=OFF
- -e CMAKE_UNITY_BUILD=ON
-e CMAKE_BUILD_PARALLEL_LEVEL=2
+ -e CMAKE_UNITY_BUILD=ON
-e PARQUET_BUILD_EXAMPLES=OFF
-e PARQUET_BUILD_EXECUTABLES=OFF
-e Protobuf_SOURCE=BUNDLED
diff --git a/appveyor.yml b/appveyor.yml
index 8342dbf6cb..2699e479b7 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -57,6 +57,7 @@ environment:
# (as generated by cmake)
- JOB: "Toolchain"
GENERATOR: Ninja
+ ARROW_GCS: "ON"
ARROW_S3: "ON"
ARROW_BUILD_FLIGHT: "ON"
ARROW_BUILD_GANDIVA: "ON"
diff --git a/ci/docker/conda-python.dockerfile
b/ci/docker/conda-python.dockerfile
index 5ef69431b8..18106d8b18 100644
--- a/ci/docker/conda-python.dockerfile
+++ b/ci/docker/conda-python.dockerfile
@@ -32,6 +32,11 @@ RUN mamba install -q \
nomkl && \
mamba clean --all
+# XXX The GCS testbench was already installed in conda-cpp.dockerfile,
+# but we changed the installed Python version above, so we need to reinstall
it.
+COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts
+RUN /arrow/ci/scripts/install_gcs_testbench.sh default
+
ENV ARROW_PYTHON=ON \
ARROW_BUILD_STATIC=OFF \
ARROW_BUILD_TESTS=OFF \
diff --git a/ci/docker/debian-11-cpp.dockerfile
b/ci/docker/debian-11-cpp.dockerfile
index 1bb67e334e..dfccd85e55 100644
--- a/ci/docker/debian-11-cpp.dockerfile
+++ b/ci/docker/debian-11-cpp.dockerfile
@@ -83,6 +83,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_DEPENDENCY_SOURCE=SYSTEM \
ARROW_FLIGHT=ON \
ARROW_GANDIVA=ON \
+ ARROW_GCS=ON \
ARROW_HOME=/usr/local \
ARROW_ORC=ON \
ARROW_PARQUET=ON \
@@ -99,6 +100,7 @@ ENV ARROW_BUILD_TESTS=ON \
AWSSDK_SOURCE=BUNDLED \
CC=gcc \
CXX=g++ \
+ google_cloud_cpp_storage_SOURCE=BUNDLED \
ORC_SOURCE=BUNDLED \
PATH=/usr/lib/ccache/:$PATH \
Protobuf_SOURCE=BUNDLED
diff --git a/ci/docker/fedora-35-cpp.dockerfile
b/ci/docker/fedora-35-cpp.dockerfile
index b79ceb894b..947c9aba1b 100644
--- a/ci/docker/fedora-35-cpp.dockerfile
+++ b/ci/docker/fedora-35-cpp.dockerfile
@@ -77,6 +77,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_FLIGHT=ON \
ARROW_GANDIVA_JAVA=ON \
ARROW_GANDIVA=ON \
+ ARROW_GCS=ON \
ARROW_HOME=/usr/local \
ARROW_ORC=ON \
ARROW_PARQUET=ON \
@@ -92,6 +93,7 @@ ENV ARROW_BUILD_TESTS=ON \
AWSSDK_SOURCE=BUNDLED \
CC=gcc \
CXX=g++ \
+ google_cloud_cpp_storage_SOURCE=BUNDLED \
ORC_SOURCE=BUNDLED \
PARQUET_BUILD_EXECUTABLES=ON \
PARQUET_BUILD_EXAMPLES=ON \
diff --git a/ci/docker/linux-apt-docs.dockerfile
b/ci/docker/linux-apt-docs.dockerfile
index 3a8a9cf8e2..c1ee003f4f 100644
--- a/ci/docker/linux-apt-docs.dockerfile
+++ b/ci/docker/linux-apt-docs.dockerfile
@@ -97,6 +97,7 @@ ENV ARROW_BUILD_STATIC=OFF \
ARROW_BUILD_TESTS=OFF \
ARROW_BUILD_UTILITIES=OFF \
ARROW_FLIGHT=ON \
+ ARROW_GCS=ON \
ARROW_GLIB_VALA=false \
ARROW_PYTHON=ON \
ARROW_S3=ON \
diff --git a/ci/docker/python-wheel-manylinux-test.dockerfile
b/ci/docker/python-wheel-manylinux-test.dockerfile
index 55c27d1d7b..cdd0ae3ced 100644
--- a/ci/docker/python-wheel-manylinux-test.dockerfile
+++ b/ci/docker/python-wheel-manylinux-test.dockerfile
@@ -25,3 +25,6 @@ FROM ${arch}/python:${python}
# test dependencies in a docker image
COPY python/requirements-wheel-test.txt /arrow/python/
RUN pip install -r /arrow/python/requirements-wheel-test.txt
+
+COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts/
+RUN PYTHON=python /arrow/ci/scripts/install_gcs_testbench.sh default
diff --git a/ci/docker/ubuntu-20.04-cpp.dockerfile
b/ci/docker/ubuntu-20.04-cpp.dockerfile
index 6e811ea2f7..0eade393cd 100644
--- a/ci/docker/ubuntu-20.04-cpp.dockerfile
+++ b/ci/docker/ubuntu-20.04-cpp.dockerfile
@@ -96,6 +96,7 @@ RUN apt-get update -y -q && \
nlohmann-json3-dev \
pkg-config \
protobuf-compiler \
+ python3-dev \
python3-pip \
python3-rados \
rados-objclass-dev \
diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile
b/ci/docker/ubuntu-22.04-cpp.dockerfile
index a7cc5ff38a..1398dcd636 100644
--- a/ci/docker/ubuntu-22.04-cpp.dockerfile
+++ b/ci/docker/ubuntu-22.04-cpp.dockerfile
@@ -156,6 +156,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_FLIGHT=ON \
ARROW_FLIGHT_SQL=ON \
ARROW_GANDIVA=ON \
+ ARROW_GCS=ON \
ARROW_HDFS=ON \
ARROW_HOME=/usr/local \
ARROW_INSTALL_NAME_RPATH=OFF \
@@ -175,6 +176,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_WITH_ZLIB=ON \
ARROW_WITH_ZSTD=ON \
AWSSDK_SOURCE=BUNDLED \
+ google_cloud_cpp_storage_SOURCE=BUNDLED \
GTest_SOURCE=BUNDLED \
ORC_SOURCE=BUNDLED \
PARQUET_BUILD_EXAMPLES=ON \
diff --git a/ci/scripts/install_gcs_testbench.sh
b/ci/scripts/install_gcs_testbench.sh
index 0282e0fda5..c7a6ee7a6d 100755
--- a/ci/scripts/install_gcs_testbench.sh
+++ b/ci/scripts/install_gcs_testbench.sh
@@ -24,10 +24,24 @@ if [ "$#" -ne 1 ]; then
exit 1
fi
-if [ "$(uname -m)" != "x86_64" ]; then
- echo "GCS testbench won't install on non-x86 architecture"
- exit 0
-fi
+case "$(uname -m)" in
+ aarch64|arm64|x86_64)
+ : # OK
+ ;;
+ *)
+ echo "GCS testbench is installed only on x86 or arm architectures: $(uname
-m)"
+ exit 0
+ ;;
+esac
+
+case "$(uname -s)-$(uname -m)" in
+ Darwin-arm64)
+ # Workaround for https://github.com/grpc/grpc/issues/28387 .
+ # Build grpcio instead of using wheel.
+ # storage-testbench 0.16.0 pins grpcio to 1.44.0.
+ ${PYTHON:-python3} -m pip install --no-binary :all: "grpcio==1.44.0"
+ ;;
+esac
version=$1
if [[ "${version}" -eq "default" ]]; then
diff --git a/ci/scripts/python_build.sh b/ci/scripts/python_build.sh
index b90321643c..cfac68bd6e 100755
--- a/ci/scripts/python_build.sh
+++ b/ci/scripts/python_build.sh
@@ -58,6 +58,7 @@ export PYARROW_WITH_CUDA=${ARROW_CUDA:-OFF}
export PYARROW_WITH_DATASET=${ARROW_DATASET:-ON}
export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT:-OFF}
export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA:-OFF}
+export PYARROW_WITH_GCS=${ARROW_GCS:-OFF}
export PYARROW_WITH_HDFS=${ARROW_HDFS:-ON}
export PYARROW_WITH_ORC=${ARROW_ORC:-OFF}
export PYARROW_WITH_PLASMA=${ARROW_PLASMA:-OFF}
diff --git a/ci/scripts/python_test.sh b/ci/scripts/python_test.sh
index e1d06c1872..4e2990b84d 100755
--- a/ci/scripts/python_test.sh
+++ b/ci/scripts/python_test.sh
@@ -38,6 +38,7 @@ export ARROW_DEBUG_MEMORY_POOL=trap
: ${PYARROW_TEST_DATASET:=${ARROW_DATASET:-ON}}
: ${PYARROW_TEST_FLIGHT:=${ARROW_FLIGHT:-ON}}
: ${PYARROW_TEST_GANDIVA:=${ARROW_GANDIVA:-ON}}
+: ${PYARROW_TEST_GCS:=${ARROW_GCS:-ON}}
: ${PYARROW_TEST_HDFS:=${ARROW_HDFS:-ON}}
: ${PYARROW_TEST_ORC:=${ARROW_ORC:-ON}}
: ${PYARROW_TEST_PARQUET:=${ARROW_PARQUET:-ON}}
@@ -47,6 +48,7 @@ export PYARROW_TEST_CUDA
export PYARROW_TEST_DATASET
export PYARROW_TEST_FLIGHT
export PYARROW_TEST_GANDIVA
+export PYARROW_TEST_GCS
export PYARROW_TEST_HDFS
export PYARROW_TEST_ORC
export PYARROW_TEST_PARQUET
diff --git a/ci/scripts/python_wheel_macos_build.sh
b/ci/scripts/python_wheel_macos_build.sh
index b3ae912dff..7fa43a3eaa 100755
--- a/ci/scripts/python_wheel_macos_build.sh
+++ b/ci/scripts/python_wheel_macos_build.sh
@@ -64,7 +64,7 @@ echo "=== (${PYTHON_VERSION}) Building Arrow C++ libraries
==="
: ${ARROW_DATASET:=ON}
: ${ARROW_FLIGHT:=ON}
: ${ARROW_GANDIVA:=OFF}
-: ${ARROW_GCS:=OFF}
+: ${ARROW_GCS:=ON}
: ${ARROW_HDFS:=ON}
: ${ARROW_JEMALLOC:=ON}
: ${ARROW_MIMALLOC:=ON}
@@ -148,6 +148,7 @@ export PYARROW_INSTALL_TESTS=1
export PYARROW_WITH_DATASET=${ARROW_DATASET}
export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT}
export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA}
+export PYARROW_WITH_GCS=${ARROW_GCS}
export PYARROW_WITH_HDFS=${ARROW_HDFS}
export PYARROW_WITH_ORC=${ARROW_ORC}
export PYARROW_WITH_PARQUET=${ARROW_PARQUET}
diff --git a/ci/scripts/python_wheel_manylinux_build.sh
b/ci/scripts/python_wheel_manylinux_build.sh
index d242fe657c..6cfd34d851 100755
--- a/ci/scripts/python_wheel_manylinux_build.sh
+++ b/ci/scripts/python_wheel_manylinux_build.sh
@@ -51,7 +51,7 @@ echo "=== (${PYTHON_VERSION}) Building Arrow C++ libraries
==="
: ${ARROW_DATASET:=ON}
: ${ARROW_FLIGHT:=ON}
: ${ARROW_GANDIVA:=OFF}
-: ${ARROW_GCS:=OFF}
+: ${ARROW_GCS:=ON}
: ${ARROW_HDFS:=ON}
: ${ARROW_JEMALLOC:=ON}
: ${ARROW_MIMALLOC:=ON}
@@ -144,6 +144,7 @@ export PYARROW_INSTALL_TESTS=1
export PYARROW_WITH_DATASET=${ARROW_DATASET}
export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT}
export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA}
+export PYARROW_WITH_GCS=${ARROW_GCS}
export PYARROW_WITH_HDFS=${ARROW_HDFS}
export PYARROW_WITH_ORC=${ARROW_ORC}
export PYARROW_WITH_PARQUET=${ARROW_PARQUET}
diff --git a/ci/scripts/python_wheel_unix_test.sh
b/ci/scripts/python_wheel_unix_test.sh
index 99436e0c1f..2b2fe9cdf1 100755
--- a/ci/scripts/python_wheel_unix_test.sh
+++ b/ci/scripts/python_wheel_unix_test.sh
@@ -31,6 +31,7 @@ source_dir=${1}
: ${ARROW_FLIGHT:=ON}
: ${ARROW_SUBSTRAIT:=ON}
: ${ARROW_S3:=ON}
+: ${ARROW_GCS:=ON}
: ${CHECK_IMPORTS:=ON}
: ${CHECK_UNITTESTS:=ON}
: ${INSTALL_PYARROW:=ON}
@@ -39,6 +40,7 @@ export PYARROW_TEST_CYTHON=OFF
export PYARROW_TEST_DATASET=ON
export PYARROW_TEST_FLIGHT=${ARROW_FLIGHT}
export PYARROW_TEST_GANDIVA=OFF
+export PYARROW_TEST_GCS=${ARROW_GCS}
export PYARROW_TEST_HDFS=ON
export PYARROW_TEST_ORC=ON
export PYARROW_TEST_PANDAS=ON
@@ -69,6 +71,9 @@ import pyarrow.orc
import pyarrow.parquet
import pyarrow.plasma
"
+ if [ "${PYARROW_TEST_GCS}" == "ON" ]; then
+ python -c "import pyarrow._gcsfs"
+ fi
if [ "${PYARROW_TEST_S3}" == "ON" ]; then
python -c "import pyarrow._s3fs"
fi
@@ -81,8 +86,14 @@ import pyarrow.plasma
fi
if [ "${CHECK_UNITTESTS}" == "ON" ]; then
- # Install testing dependencies
- pip install -U -r ${source_dir}/python/requirements-wheel-test.txt
+ # Generally, we should install testing dependencies here to install
+ # built wheels without testing dependencies. Testing dependencies are
+ # installed in ci/docker/python-wheel-manylinux-test.dockerfile to
+ # reduce test time.
+ #
+ # We also need to update dev/tasks/python-wheels/*.yml when we need
+ # to add more steps to prepare testing dependencies.
+
# Execute unittest, test dependencies must be installed
python -c 'import pyarrow; pyarrow.create_library_symlinks()'
python -m pytest -r s --pyargs pyarrow
diff --git a/cpp/src/arrow/filesystem/api.h b/cpp/src/arrow/filesystem/api.h
index 5b0c97d150..732be5f928 100644
--- a/cpp/src/arrow/filesystem/api.h
+++ b/cpp/src/arrow/filesystem/api.h
@@ -21,8 +21,11 @@
#include "arrow/filesystem/filesystem.h" // IWYU pragma: export
#include "arrow/filesystem/hdfs.h" // IWYU pragma: export
-#include "arrow/filesystem/localfs.h" // IWYU pragma: export
-#include "arrow/filesystem/mockfs.h" // IWYU pragma: export
+#ifdef ARROW_GCS
+#include "arrow/filesystem/gcsfs.h" // IWYU pragma: export
+#endif
+#include "arrow/filesystem/localfs.h" // IWYU pragma: export
+#include "arrow/filesystem/mockfs.h" // IWYU pragma: export
#ifdef ARROW_S3
#include "arrow/filesystem/s3fs.h" // IWYU pragma: export
#endif
diff --git a/cpp/src/arrow/filesystem/filesystem.cc
b/cpp/src/arrow/filesystem/filesystem.cc
index 18b39125f5..48b4646bea 100644
--- a/cpp/src/arrow/filesystem/filesystem.cc
+++ b/cpp/src/arrow/filesystem/filesystem.cc
@@ -695,8 +695,7 @@ Result<std::shared_ptr<FileSystem>>
FileSystemFromUriReal(const Uri& uri,
if (scheme == "gs" || scheme == "gcs") {
#ifdef ARROW_GCS
ARROW_ASSIGN_OR_RAISE(auto options, GcsOptions::FromUri(uri, out_path));
- ARROW_ASSIGN_OR_RAISE(auto gcsfs, GcsFileSystem::Make(options,
io_context));
- return gcsfs;
+ return GcsFileSystem::Make(options, io_context);
#else
return Status::NotImplemented("Got GCS URI but Arrow compiled without GCS
support");
#endif
diff --git a/cpp/src/arrow/filesystem/filesystem.h
b/cpp/src/arrow/filesystem/filesystem.h
index dfa2b74008..6dc18d7de8 100644
--- a/cpp/src/arrow/filesystem/filesystem.h
+++ b/cpp/src/arrow/filesystem/filesystem.h
@@ -452,7 +452,8 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem {
/// \brief Create a new FileSystem by URI
///
-/// Recognized schemes are "file", "mock", "hdfs" and "s3fs".
+/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
+/// "gs" and "gcs".
///
/// \param[in] uri a URI-based path, ex: file:///some/local/path
/// \param[out] out_path (optional) Path inside the filesystem.
@@ -463,7 +464,8 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUri(const
std::string& uri,
/// \brief Create a new FileSystem by URI with a custom IO context
///
-/// Recognized schemes are "file", "mock", "hdfs" and "s3fs".
+/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3",
+/// "gs" and "gcs".
///
/// \param[in] uri a URI-based path, ex: file:///some/local/path
/// \param[in] io_context an IOContext which will be associated with the
filesystem
diff --git a/cpp/src/arrow/filesystem/gcsfs.cc
b/cpp/src/arrow/filesystem/gcsfs.cc
index 9bd1b15b99..82d2439a60 100644
--- a/cpp/src/arrow/filesystem/gcsfs.cc
+++ b/cpp/src/arrow/filesystem/gcsfs.cc
@@ -19,11 +19,13 @@
#include <google/cloud/storage/client.h>
#include <algorithm>
+#include <chrono>
#include "arrow/buffer.h"
#include "arrow/filesystem/gcsfs_internal.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
+#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/thread_pool.h"
@@ -33,13 +35,23 @@
namespace arrow {
namespace fs {
-struct GcsCredentials {
- explicit GcsCredentials(std::shared_ptr<google::cloud::Credentials> c)
- : credentials(std::move(c)) {}
-
+struct GcsCredentialsHolder {
+ // Constructor needed for make_shared
+ explicit GcsCredentialsHolder(std::shared_ptr<google::cloud::Credentials>
credentials)
+ : credentials(std::move(credentials)) {}
std::shared_ptr<google::cloud::Credentials> credentials;
};
+bool GcsCredentials::Equals(const GcsCredentials& other) const {
+ if (holder_->credentials == other.holder_->credentials) {
+ return true;
+ }
+ return anonymous_ == other.anonymous_ && access_token_ ==
other.access_token_ &&
+ expiration_ == other.expiration_ &&
+ json_credentials_ == other.json_credentials_ &&
+ target_service_account_ == other.target_service_account_;
+}
+
namespace {
namespace gcs = google::cloud::storage;
@@ -95,12 +107,10 @@ struct GcsPath {
class GcsInputStream : public arrow::io::InputStream {
public:
explicit GcsInputStream(gcs::ObjectReadStream stream, GcsPath path,
- gcs::Generation generation, gcs::ReadFromOffset
offset,
- gcs::Client client)
+ gcs::Generation generation, gcs::Client client)
: stream_(std::move(stream)),
path_(std::move(path)),
generation_(generation),
- offset_(offset.value_or(0)),
client_(std::move(client)) {}
~GcsInputStream() override = default;
@@ -115,7 +125,7 @@ class GcsInputStream : public arrow::io::InputStream {
Result<int64_t> Tell() const override {
if (closed()) return Status::Invalid("Cannot use Tell() on a closed
stream");
- return stream_.tellg() + offset_;
+ return stream_.tellg();
}
// A gcs::ObjectReadStream can be "born closed". For small objects the
stream returns
@@ -156,7 +166,6 @@ class GcsInputStream : public arrow::io::InputStream {
mutable gcs::ObjectReadStream stream_;
GcsPath path_;
gcs::Generation generation_;
- std::int64_t offset_;
gcs::Client client_;
bool closed_ = false;
};
@@ -164,9 +173,17 @@ class GcsInputStream : public arrow::io::InputStream {
class GcsOutputStream : public arrow::io::OutputStream {
public:
explicit GcsOutputStream(gcs::ObjectWriteStream stream) :
stream_(std::move(stream)) {}
- ~GcsOutputStream() override = default;
+ ~GcsOutputStream() {
+ if (!closed_) {
+ // The common pattern is to close OutputStreams from destructor in arrow.
+ io::internal::CloseFromDestructor(this);
+ }
+ }
Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
stream_.Close();
closed_ = true;
return internal::ToArrowStatus(stream_.last_status());
@@ -297,8 +314,15 @@ google::cloud::Options AsGoogleCloudOptions(const
GcsOptions& o) {
if (!o.endpoint_override.empty()) {
options.set<gcs::RestEndpointOption>(scheme + "://" + o.endpoint_override);
}
- if (o.credentials && o.credentials->credentials) {
-
options.set<google::cloud::UnifiedCredentialsOption>(o.credentials->credentials);
+ if (o.credentials.holder() && o.credentials.holder()->credentials) {
+ options.set<google::cloud::UnifiedCredentialsOption>(
+ o.credentials.holder()->credentials);
+ }
+ if (o.retry_limit_seconds.has_value()) {
+ options.set<gcs::RetryPolicyOption>(
+ gcs::LimitedTimeRetryPolicy(
+ std::chrono::milliseconds(static_cast<int>(*o.retry_limit_seconds
* 1000)))
+ .clone());
}
return options;
}
@@ -318,19 +342,44 @@ class GcsFileSystem::Impl {
return GetFileInfoBucket(path, std::move(meta).status());
}
auto meta = client_.GetObjectMetadata(path.bucket, path.object);
- return GetFileInfoObject(path, meta);
+ Result<FileInfo> info = GetFileInfoObject(path, meta);
+ if (!info.ok() || info->type() != FileType::NotFound) {
+ return info;
+ }
+ // Not found case. It could be this was written to GCS with a different
+ // "Directory" convention, so if there is at least one object that
+ // matches the prefix we assume it is a directory.
+ std::string canonical = internal::EnsureTrailingSlash(path.object);
+ auto list_result = client_.ListObjects(path.bucket,
gcs::Prefix(canonical));
+ if (list_result.begin() != list_result.end()) {
+ // If there is at least one result it indicates this is a directory (at
+ // least one object exists that starts with "path/")
+ return FileInfo(path.full_path, FileType::Directory);
+ }
+ // Return the original not-found info if there was no match.
+ return info;
}
Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
- // Adding the trailing '/' avoids problems with files named 'a', 'ab',
'ac' where GCS
- // would return all of them if the prefix is 'a'.
+ // Adding the trailing '/' avoids problems with files named 'a', 'ab',
'ac' where
+ // GCS would return all of them if the prefix is 'a'.
const auto canonical = internal::EnsureTrailingSlash(p.object);
- const auto max_depth = internal::Depth(canonical) + select.max_recursion;
+ // Need to add one level when the object is not empty because all
+ // directories have an extra slash.
+ const auto max_depth =
+ internal::Depth(canonical) + select.max_recursion + !p.object.empty();
auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
+ // Include trailing delimiters ensures that files matching "directory"
+ // conventions are also included in the listing.
+ // Only included for select.recursive false because a delimiter needs
+ // to be specified.
+ auto include_trailing = select.recursive ?
gcs::IncludeTrailingDelimiter(false)
+ :
gcs::IncludeTrailingDelimiter(true);
FileInfoVector result;
- for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
+ for (auto const& o :
+ client_.ListObjects(p.bucket, prefix, delimiter, include_trailing)) {
if (!o.ok()) {
if (select.allow_not_found &&
o.status().code() == google::cloud::StatusCode::kNotFound) {
@@ -340,11 +389,11 @@ class GcsFileSystem::Impl {
}
// Skip the directory itself from the results, and any result that is
"too deep"
// into the recursion.
- if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
+ if (o->name() == canonical || internal::Depth(o->name()) > max_depth) {
continue;
}
auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
- result.push_back(ToFileInfo(path, *o));
+ result.push_back(ToFileInfo(path, *o, /*normalize_directories=*/true));
}
// Finding any elements indicates the directory was found.
if (!result.empty() || select.allow_not_found) {
@@ -365,7 +414,7 @@ class GcsFileSystem::Impl {
google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const
std::string& bucket,
util::string_view name) {
// Make the name canonical.
- const auto canonical = internal::RemoveTrailingSlash(name).to_string();
+ const auto canonical = internal::EnsureTrailingSlash(name);
google::cloud::StatusOr<gcs::ObjectMetadata> object = client_.InsertObject(
bucket, canonical, std::string(),
gcs::WithObjectMetadata(
@@ -398,6 +447,13 @@ class GcsFileSystem::Impl {
if (o) {
if (IsDirectory(*o)) break;
return NotDirectoryError(*o);
+ } else {
+ // If we didn't find the raw path, check if there is an entry
+ // ending in a slash.
+ o = client_.GetObjectMetadata(bucket,
internal::EnsureTrailingSlash(dir));
+ if (o) {
+ break;
+ }
}
missing_parents.push_back(dir);
}
@@ -430,15 +486,17 @@ class GcsFileSystem::Impl {
Status CreateDir(const GcsPath& p) {
if (p.object.empty()) {
- return internal::ToArrowStatus(
- client_
- .CreateBucket(p.bucket, gcs::BucketMetadata().set_location(
- options_.default_bucket_location))
- .status());
+ auto metadata =
+ gcs::BucketMetadata().set_location(options_.default_bucket_location);
+ return internal::ToArrowStatus(client_.CreateBucket(p.bucket,
metadata).status());
}
auto parent = p.parent();
if (!parent.object.empty()) {
- auto o = client_.GetObjectMetadata(p.bucket, parent.object);
+ auto o = client_.GetObjectMetadata(p.bucket,
+
internal::EnsureTrailingSlash(parent.object));
+ if (!o.ok()) {
+ return internal::ToArrowStatus(o.status());
+ }
if (!IsDirectory(*o)) return NotDirectoryError(*o);
}
return internal::ToArrowStatus(CreateDirMarker(p.bucket,
p.object).status());
@@ -451,7 +509,8 @@ class GcsFileSystem::Impl {
Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) {
RETURN_NOT_OK(DeleteDirContents(p, /*missing_dir_ok=*/false, io_context));
if (!p.object.empty()) {
- return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object));
+ auto canonical = std::string(internal::EnsureTrailingSlash(p.object));
+ return internal::ToArrowStatus(client_.DeleteObject(p.bucket,
canonical));
}
return internal::ToArrowStatus(client_.DeleteBucket(p.bucket));
}
@@ -484,7 +543,7 @@ class GcsFileSystem::Impl {
submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o)));
}
- if (!missing_dir_ok && !at_least_one_obj && !dir) {
+ if (!missing_dir_ok && !at_least_one_obj && !dir && !p.object.empty()) {
// No files were found and no directory marker exists
return Status::IOError("No such directory: ", p.full_path);
}
@@ -544,8 +603,7 @@ class GcsFileSystem::Impl {
gcs::ReadFromOffset
offset) {
auto stream = client_.ReadObject(path.bucket, path.object, generation,
offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
- return std::make_shared<GcsInputStream>(std::move(stream), path,
gcs::Generation(),
- offset, client_);
+ return std::make_shared<GcsInputStream>(std::move(stream), path,
generation, client_);
}
Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
@@ -603,10 +661,25 @@ class GcsFileSystem::Impl {
return internal::ToArrowStatus(meta.status());
}
+ // The normalize_directories parameter is needed because
+ // how a directory is listed. If a specific path is asked
+ // for with a trailing slash it is expected to have a trailing
+ // slash [1] but for recursive listings it is expected that
+ // directories have their path normalized [2].
+ // [1]
+ //
https://github.com/apache/arrow/blob/3eaa7dd0e8b3dabc5438203331f05e3e6c011e37/python/pyarrow/tests/test_fs.py#L688
+ // [2]
+ //
https://github.com/apache/arrow/blob/3eaa7dd0e8b3dabc5438203331f05e3e6c011e37/cpp/src/arrow/filesystem/test_util.cc#L767
static FileInfo ToFileInfo(const std::string& full_path,
- const gcs::ObjectMetadata& meta) {
- if (IsDirectory(meta)) {
- return FileInfo(full_path, FileType::Directory);
+ const gcs::ObjectMetadata& meta,
+ bool normalize_directories = false) {
+ if (IsDirectory(meta) || (!full_path.empty() && full_path.back() == '/')) {
+ if (normalize_directories) {
+ auto normalized =
std::string(internal::RemoveTrailingSlash(full_path));
+ return FileInfo(std::move(normalized), FileType::Directory);
+ } else {
+ return FileInfo(full_path, FileType::Directory);
+ }
}
auto info = FileInfo(full_path, FileType::File);
info.set_size(static_cast<int64_t>(meta.size()));
@@ -621,33 +694,43 @@ class GcsFileSystem::Impl {
gcs::Client client_;
};
+GcsOptions::GcsOptions() {
+ this->credentials.holder_ = std::make_shared<GcsCredentialsHolder>(
+ google::cloud::MakeGoogleDefaultCredentials());
+ this->scheme = "https";
+}
+
bool GcsOptions::Equals(const GcsOptions& other) const {
- return credentials == other.credentials &&
+ return credentials.Equals(other.credentials) &&
endpoint_override == other.endpoint_override && scheme ==
other.scheme &&
- default_bucket_location == other.default_bucket_location;
+ default_bucket_location == other.default_bucket_location &&
+ retry_limit_seconds == other.retry_limit_seconds;
}
GcsOptions GcsOptions::Defaults() {
- GcsOptions options{};
- options.credentials =
-
std::make_shared<GcsCredentials>(google::cloud::MakeGoogleDefaultCredentials());
- options.scheme = "https";
+ GcsOptions options;
return options;
}
GcsOptions GcsOptions::Anonymous() {
GcsOptions options{};
- options.credentials =
-
std::make_shared<GcsCredentials>(google::cloud::MakeInsecureCredentials());
+ options.credentials.holder_ =
+
std::make_shared<GcsCredentialsHolder>(google::cloud::MakeInsecureCredentials());
+ options.credentials.anonymous_ = true;
options.scheme = "http";
return options;
}
GcsOptions GcsOptions::FromAccessToken(const std::string& access_token,
- std::chrono::system_clock::time_point
expiration) {
+ TimePoint expiration) {
GcsOptions options{};
- options.credentials = std::make_shared<GcsCredentials>(
- google::cloud::MakeAccessTokenCredentials(access_token, expiration));
+ options.credentials.holder_ =
+
std::make_shared<GcsCredentialsHolder>(google::cloud::MakeAccessTokenCredentials(
+ access_token,
+
std::chrono::time_point_cast<std::chrono::system_clock::time_point::duration>(
+ expiration)));
+ options.credentials.access_token_ = access_token;
+ options.credentials.expiration_ = expiration;
options.scheme = "https";
return options;
}
@@ -655,17 +738,20 @@ GcsOptions GcsOptions::FromAccessToken(const std::string&
access_token,
GcsOptions GcsOptions::FromImpersonatedServiceAccount(
const GcsCredentials& base_credentials, const std::string&
target_service_account) {
GcsOptions options{};
- options.credentials = std::make_shared<GcsCredentials>(
+ options.credentials = base_credentials;
+ options.credentials.holder_ = std::make_shared<GcsCredentialsHolder>(
google::cloud::MakeImpersonateServiceAccountCredentials(
- base_credentials.credentials, target_service_account));
+ base_credentials.holder_->credentials, target_service_account));
+ options.credentials.target_service_account_ = target_service_account;
options.scheme = "https";
return options;
}
GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string&
json_object) {
GcsOptions options{};
- options.credentials = std::make_shared<GcsCredentials>(
+ options.credentials.holder_ = std::make_shared<GcsCredentialsHolder>(
google::cloud::MakeServiceAccountCredentials(json_object));
+ options.credentials.json_credentials_ = json_object;
options.scheme = "https";
return options;
}
@@ -698,11 +784,16 @@ Result<GcsOptions> GcsOptions::FromUri(const
arrow::internal::Uri& uri,
options_map.emplace(kv.first, kv.second);
}
- if (!uri.password().empty() || !uri.username().empty()) {
- return Status::Invalid("GCS does not accept username or password.");
+ const std::string& username = uri.username();
+ bool anonymous = username == "anonymous";
+ if (!username.empty() && !anonymous) {
+ return Status::Invalid("GCS URIs do not accept username except
\"anonymous\".");
+ }
+ if (!uri.password().empty()) {
+ return Status::Invalid("GCS URIs do not accept password.");
}
+ auto options = anonymous ? GcsOptions::Anonymous() : GcsOptions::Defaults();
- auto options = GcsOptions::Defaults();
for (const auto& kv : options_map) {
if (kv.first == "location") {
options.default_bucket_location = kv.second;
@@ -710,6 +801,13 @@ Result<GcsOptions> GcsOptions::FromUri(const
arrow::internal::Uri& uri,
options.scheme = kv.second;
} else if (kv.first == "endpoint_override") {
options.endpoint_override = kv.second;
+ } else if (kv.first == "retry_limit_seconds") {
+ double parsed_seconds = atof(kv.second.c_str());
+ if (parsed_seconds <= 0.0) {
+ return Status::Invalid("retry_limit_seconds must be a positive
integer, got '",
+ kv.second, "'");
+ }
+ options.retry_limit_seconds = parsed_seconds;
} else {
return Status::Invalid("Unexpected query parameter in GCS URI: '",
kv.first, "'");
}
@@ -726,6 +824,7 @@ Result<GcsOptions> GcsOptions::FromUri(const std::string&
uri_string,
}
std::string GcsFileSystem::type_name() const { return "gcs"; }
+const GcsOptions& GcsFileSystem::options() const { return impl_->options(); }
bool GcsFileSystem::Equals(const FileSystem& other) const {
if (this == &other) {
diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h
index c84374cdbb..8458c7f210 100644
--- a/cpp/src/arrow/filesystem/gcsfs.h
+++ b/cpp/src/arrow/filesystem/gcsfs.h
@@ -22,22 +22,56 @@
#include <vector>
#include "arrow/filesystem/filesystem.h"
+#include "arrow/util/optional.h"
#include "arrow/util/uri.h"
namespace arrow {
namespace fs {
-struct GcsCredentials;
+// Opaque wrapper for GCS's library credentials to avoid exposing in Arrow
headers.
+struct GcsCredentialsHolder;
+class GcsFileSystem;
+
+/// \brief Container for GCS Credentials and information necessary to recreate
them.
+class ARROW_EXPORT GcsCredentials {
+ public:
+ bool Equals(const GcsCredentials& other) const;
+ bool anonymous() const { return anonymous_; }
+ const std::string& access_token() const { return access_token_; }
+ TimePoint expiration() const { return expiration_; }
+ const std::string& target_service_account() const { return
target_service_account_; }
+ const std::string& json_credentials() const { return json_credentials_; }
+ const std::shared_ptr<GcsCredentialsHolder>& holder() const { return
holder_; }
+
+ private:
+ GcsCredentials() = default;
+ bool anonymous_ = false;
+ std::string access_token_;
+ TimePoint expiration_;
+ std::string target_service_account_;
+ std::string json_credentials_;
+ std::shared_ptr<GcsCredentialsHolder> holder_;
+ friend class GcsFileSystem;
+ friend struct GcsOptions;
+};
/// Options for the GcsFileSystem implementation.
struct ARROW_EXPORT GcsOptions {
- std::shared_ptr<GcsCredentials> credentials;
+ /// \brief Equivalent to GcsOptions::Defaults().
+ GcsOptions();
+ GcsCredentials credentials;
std::string endpoint_override;
std::string scheme;
/// \brief Location to use for creating buckets.
std::string default_bucket_location;
+ /// \brief If set used to control total time allowed for retrying underlying
+ /// errors.
+ ///
+ /// The default policy is to retry for up to 15 minutes.
+ arrow::util::optional<double> retry_limit_seconds;
+
/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
@@ -68,7 +102,7 @@ struct ARROW_EXPORT GcsOptions {
/// tokens. Note that access tokens are time limited, you will need to
manually refresh
/// the tokens created by the out-of-band mechanism.
static GcsOptions FromAccessToken(const std::string& access_token,
- std::chrono::system_clock::time_point
expiration);
+ TimePoint expiration);
/// \brief Initialize with service account impersonation
///
@@ -141,6 +175,7 @@ class ARROW_EXPORT GcsFileSystem : public FileSystem {
~GcsFileSystem() override = default;
std::string type_name() const override;
+ const GcsOptions& options() const;
bool Equals(const FileSystem& other) const override;
diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc
b/cpp/src/arrow/filesystem/gcsfs_internal.cc
index a75b51430f..b8f0ab80b2 100644
--- a/cpp/src/arrow/filesystem/gcsfs_internal.cc
+++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc
@@ -296,7 +296,10 @@ Result<std::shared_ptr<const KeyValueMetadata>>
FromObjectMetadata(
}
std::int64_t Depth(arrow::util::string_view path) {
- return std::count(path.begin(), path.end(), fs::internal::kSep);
+ // The last slash is not counted towards depth because it represents a
+ // directory.
+ bool has_trailing_slash = !path.empty() && path.back() == '/';
+ return std::count(path.begin(), path.end(), fs::internal::kSep) -
has_trailing_slash;
}
} // namespace internal
diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc
b/cpp/src/arrow/filesystem/gcsfs_test.cc
index 9eaacb0dc1..4d8f52ef48 100644
--- a/cpp/src/arrow/filesystem/gcsfs_test.cc
+++ b/cpp/src/arrow/filesystem/gcsfs_test.cc
@@ -50,6 +50,7 @@ namespace bp = boost::process;
namespace gc = google::cloud;
namespace gcs = google::cloud::storage;
+using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::IsEmpty;
using ::testing::Not;
@@ -99,6 +100,8 @@ class GcsTestbench : public ::testing::Environment {
error_ = std::move(error);
}
+ bool running() { return server_process_.running(); }
+
~GcsTestbench() override {
// Brutal shutdown, kill the full process group because the GCS testbench
may launch
// additional children.
@@ -130,6 +133,7 @@ class GcsIntegrationTest : public ::testing::Test {
void SetUp() override {
ASSERT_THAT(Testbench(), NotNull());
ASSERT_THAT(Testbench()->error(), IsEmpty());
+ ASSERT_TRUE(Testbench()->running());
// Initialize a PRNG with a small amount of entropy.
generator_ = std::mt19937_64(std::random_device()());
@@ -140,7 +144,11 @@ class GcsIntegrationTest : public ::testing::Test {
auto client = gcs::Client(
google::cloud::Options{}
.set<gcs::RestEndpointOption>("http://127.0.0.1:" +
Testbench()->port())
- .set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
+ .set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials())
+ .set<gcs::TransferStallTimeoutOption>(std::chrono::seconds(5))
+ .set<gcs::RetryPolicyOption>(
+
gcs::LimitedTimeRetryPolicy(std::chrono::seconds(45)).clone()));
+
google::cloud::StatusOr<gcs::BucketMetadata> bucket =
client.CreateBucketForProject(
PreexistingBucketName(), "ignored-by-testbench",
gcs::BucketMetadata{});
ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" <<
PreexistingBucketName()
@@ -167,6 +175,7 @@ class GcsIntegrationTest : public ::testing::Test {
GcsOptions TestGcsOptions() {
auto options = GcsOptions::Anonymous();
options.endpoint_override = "127.0.0.1:" + Testbench()->port();
+ options.retry_limit_seconds = 60;
return options;
}
@@ -291,15 +300,15 @@ TEST(GcsFileSystem, OptionsCompare) {
TEST(GcsFileSystem, OptionsAnonymous) {
GcsOptions a = GcsOptions::Anonymous();
- EXPECT_THAT(a.credentials, NotNull());
+ EXPECT_THAT(a.credentials.holder(), NotNull());
+ EXPECT_TRUE(a.credentials.anonymous());
EXPECT_EQ(a.scheme, "http");
}
TEST(GcsFileSystem, OptionsFromUri) {
std::string path;
- GcsOptions options;
- ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://", &path));
+ ASSERT_OK_AND_ASSIGN(GcsOptions options, GcsOptions::FromUri("gs://",
&path));
EXPECT_EQ(options.default_bucket_location, "");
EXPECT_EQ(options.scheme, "https");
EXPECT_EQ(options.endpoint_override, "");
@@ -327,35 +336,48 @@ TEST(GcsFileSystem, OptionsFromUri) {
ASSERT_OK_AND_ASSIGN(
options,
GcsOptions::FromUri("gs://mybucket/foo/bar/"
-
"?endpoint_override=localhost&scheme=http&location=us-west2",
+
"?endpoint_override=localhost&scheme=http&location=us-west2"
+ "&retry_limit_seconds=40.5",
&path));
EXPECT_EQ(options.default_bucket_location, "us-west2");
EXPECT_EQ(options.scheme, "http");
EXPECT_EQ(options.endpoint_override, "localhost");
EXPECT_EQ(path, "mybucket/foo/bar");
+ ASSERT_TRUE(options.retry_limit_seconds.has_value());
+ EXPECT_EQ(*options.retry_limit_seconds, 40.5);
// Missing bucket name
ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs:///foo/bar/", &path));
// Invalid option
ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs://mybucket/?xxx=zzz", &path));
+
+ // Invalid retry limit
+ ASSERT_RAISES(Invalid,
+ GcsOptions::FromUri("gs://foo/bar/?retry_limit_seconds=0",
&path));
+ ASSERT_RAISES(Invalid,
+ GcsOptions::FromUri("gs://foo/bar/?retry_limit_seconds=-1",
&path));
}
TEST(GcsFileSystem, OptionsAccessToken) {
- auto a = GcsOptions::FromAccessToken(
- "invalid-access-token-test-only",
- std::chrono::system_clock::now() + std::chrono::minutes(5));
- EXPECT_THAT(a.credentials, NotNull());
+ TimePoint expiration = std::chrono::system_clock::now() +
std::chrono::minutes(5);
+ auto a = GcsOptions::FromAccessToken(/*access_token=*/"accesst", expiration);
+ EXPECT_THAT(a.credentials.holder(), NotNull());
+ EXPECT_THAT(a.credentials.access_token(), Eq("accesst"));
+ EXPECT_THAT(a.credentials.expiration(), Eq(expiration));
EXPECT_EQ(a.scheme, "https");
}
TEST(GcsFileSystem, OptionsImpersonateServiceAccount) {
- auto base = GcsOptions::FromAccessToken(
- "invalid-access-token-test-only",
- std::chrono::system_clock::now() + std::chrono::minutes(5));
- auto a = GcsOptions::FromImpersonatedServiceAccount(
- *base.credentials,
"[email protected]");
- EXPECT_THAT(a.credentials, NotNull());
+ TimePoint expiration = std::chrono::system_clock::now() +
std::chrono::minutes(5);
+ auto base = GcsOptions::FromAccessToken(/*access_token=*/"at", expiration);
+ std::string account =
"[email protected]";
+ auto a = GcsOptions::FromImpersonatedServiceAccount(base.credentials,
account);
+ EXPECT_THAT(a.credentials.holder(), NotNull());
+ EXPECT_THAT(a.credentials.access_token(), Eq("at"));
+ EXPECT_THAT(a.credentials.expiration(), Eq(expiration));
+ EXPECT_THAT(a.credentials.target_service_account(), Eq(account));
+
EXPECT_EQ(a.scheme, "https");
}
@@ -378,7 +400,8 @@ TEST(GcsFileSystem, OptionsServiceAccountCredentials) {
})""";
auto a = GcsOptions::FromServiceAccountCredentials(kJsonKeyfileContents);
- EXPECT_THAT(a.credentials, NotNull());
+ EXPECT_THAT(a.credentials.holder(), NotNull());
+ EXPECT_THAT(a.credentials.json_credentials(), kJsonKeyfileContents);
EXPECT_EQ(a.scheme, "https");
}
@@ -568,7 +591,50 @@ TEST_F(GcsIntegrationTest, GetFileInfoBucket) {
ASSERT_RAISES(Invalid, fs->GetFileInfo("gs://" + PreexistingBucketName()));
}
-TEST_F(GcsIntegrationTest, GetFileInfoObject) {
+TEST_F(GcsIntegrationTest, GetFileInfoObjectWithNestedStructure) {
+ // Adds detailed tests to handle cases of different edge cases
+ // with directory naming conventions (e.g. with and without slashes).
+ auto fs = GcsFileSystem::Make(TestGcsOptions());
+ constexpr auto kObjectName =
"test-object-dir/some_other_dir/another_dir/foo";
+ ASSERT_OK_AND_ASSIGN(
+ auto output,
+ fs->OpenOutputStream(PreexistingBucketPath() + kObjectName,
/*metadata=*/{}));
+ const auto data = std::string(kLoremIpsum);
+ ASSERT_OK(output->Write(data.data(), data.size()));
+ ASSERT_OK(output->Close());
+
+ // 0 is immediately after "/" lexicographically, ensure that this doesn't
+ // cause unexpected issues.
+ ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(PreexistingBucketPath() +
+
"test-object-dir/some_other_dir0",
+ /*metadata=*/{}));
+ ASSERT_OK(output->Write(data.data(), data.size()));
+ ASSERT_OK(output->Close());
+ ASSERT_OK_AND_ASSIGN(
+ output,
+ fs->OpenOutputStream(PreexistingBucketPath() + kObjectName + "0",
/*metadata=*/{}));
+ ASSERT_OK(output->Write(data.data(), data.size()));
+ ASSERT_OK(output->Close());
+
+ AssertFileInfo(fs.get(), PreexistingBucketPath() + kObjectName,
FileType::File);
+ AssertFileInfo(fs.get(), PreexistingBucketPath() + kObjectName + "/",
+ FileType::NotFound);
+ AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir",
+ FileType::Directory);
+ AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir/",
+ FileType::Directory);
+ AssertFileInfo(fs.get(), PreexistingBucketPath() +
"test-object-dir/some_other_dir",
+ FileType::Directory);
+ AssertFileInfo(fs.get(), PreexistingBucketPath() +
"test-object-dir/some_other_dir/",
+ FileType::Directory);
+
+ AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-di",
+ FileType::NotFound);
+ AssertFileInfo(fs.get(), PreexistingBucketPath() +
"test-object-dir/some_other_di",
+ FileType::NotFound);
+}
+
+TEST_F(GcsIntegrationTest, GetFileInfoObjectNoExplicitObject) {
auto fs = GcsFileSystem::Make(TestGcsOptions());
auto object =
GcsClient().GetObjectMetadata(PreexistingBucketName(),
PreexistingObjectName());
@@ -633,7 +699,7 @@ TEST_F(GcsIntegrationTest,
GetFileInfoSelectorLimitedRecursion) {
SCOPED_TRACE("Testing with max_recursion=" +
std::to_string(max_recursion));
const auto max_depth =
internal::Depth(internal::EnsureTrailingSlash(hierarchy.base_dir)) +
- max_recursion;
+ max_recursion + 1; // Add 1 because files in a directory should be
included
std::vector<arrow::fs::FileInfo> expected;
std::copy_if(hierarchy.contents.begin(), hierarchy.contents.end(),
std::back_inserter(expected), [&](const arrow::fs::FileInfo&
info) {
@@ -727,6 +793,13 @@ TEST_F(GcsIntegrationTest, CreateDirUri) {
ASSERT_RAISES(Invalid, fs->CreateDir("gs://" + RandomBucketName(), true));
}
+TEST_F(GcsIntegrationTest, DeleteBucketDirSuccess) {
+ auto fs = GcsFileSystem::Make(TestGcsOptions());
+ ASSERT_OK(fs->CreateDir("pyarrow-filesystem/", /*recursive=*/true));
+ ASSERT_RAISES(Invalid, fs->CreateDir("/", false));
+ ASSERT_OK(fs->DeleteDir("pyarrow-filesystem/"));
+}
+
TEST_F(GcsIntegrationTest, DeleteDirSuccess) {
auto fs = GcsFileSystem::Make(TestGcsOptions());
ASSERT_OK_AND_ASSIGN(auto hierarchy, CreateHierarchy(fs));
@@ -1257,6 +1330,16 @@ TEST_F(GcsIntegrationTest, OpenInputFileClosed) {
ASSERT_RAISES(Invalid, stream->Seek(2));
}
+TEST_F(GcsIntegrationTest, TestFileSystemFromUri) {
+ // Smoke test for FileSystemFromUri
+ ASSERT_OK_AND_ASSIGN(auto fs,
FileSystemFromUri(std::string("gs://anonymous@") +
+ PreexistingBucketPath()));
+ EXPECT_EQ(fs->type_name(), "gcs");
+ ASSERT_OK_AND_ASSIGN(auto fs2,
FileSystemFromUri(std::string("gcs://anonymous@") +
+ PreexistingBucketPath()));
+ EXPECT_EQ(fs2->type_name(), "gcs");
+}
+
} // namespace
} // namespace fs
} // namespace arrow
diff --git a/cpp/src/arrow/util/config.h.cmake
b/cpp/src/arrow/util/config.h.cmake
index 55bc2d0100..bd6447a20e 100644
--- a/cpp/src/arrow/util/config.h.cmake
+++ b/cpp/src/arrow/util/config.h.cmake
@@ -45,6 +45,7 @@
#cmakedefine ARROW_IPC
#cmakedefine ARROW_JSON
+#cmakedefine ARROW_GCS
#cmakedefine ARROW_S3
#cmakedefine ARROW_USE_NATIVE_INT128
#cmakedefine ARROW_WITH_OPENTELEMETRY
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index dcf15afafe..8b4c38b42f 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -162,6 +162,8 @@ def _apply_options(cmd, options):
help="Build with Flight rpc support.")
@click.option("--with-gandiva", default=None, type=BOOL,
help="Build with Gandiva expression compiler support.")
[email protected]("--with-gcs", default=None, type=BOOL,
+ help="Build Arrow with Google Cloud Storage (GCS) support.")
@click.option("--with-hdfs", default=None, type=BOOL,
help="Build the Arrow HDFS bridge.")
@click.option("--with-hiveserver2", default=None, type=BOOL,
diff --git a/dev/archery/archery/lang/cpp.py b/dev/archery/archery/lang/cpp.py
index ac3b251f48..158cc48bad 100644
--- a/dev/archery/archery/lang/cpp.py
+++ b/dev/archery/archery/lang/cpp.py
@@ -52,7 +52,8 @@ class CppConfiguration:
# Components
with_compute=None, with_csv=None, with_cuda=None,
with_dataset=None, with_filesystem=None, with_flight=None,
- with_gandiva=None, with_hdfs=None, with_hiveserver2=None,
+ with_gandiva=None, with_gcs=None, with_hdfs=None,
+ with_hiveserver2=None,
with_ipc=True, with_json=None, with_jni=None,
with_mimalloc=None, with_jemalloc=None,
with_parquet=None, with_plasma=None, with_python=True,
@@ -95,6 +96,7 @@ class CppConfiguration:
self.with_filesystem = with_filesystem
self.with_flight = with_flight
self.with_gandiva = with_gandiva
+ self.with_gcs = with_gcs
self.with_hdfs = with_hdfs
self.with_hiveserver2 = with_hiveserver2
self.with_ipc = with_ipc
@@ -218,7 +220,7 @@ class CppConfiguration:
yield ("ARROW_FILESYSTEM", truthifier(self.with_filesystem))
yield ("ARROW_FLIGHT", truthifier(self.with_flight))
yield ("ARROW_GANDIVA", truthifier(self.with_gandiva))
- yield ("ARROW_PARQUET", truthifier(self.with_parquet))
+ yield ("ARROW_GCS", truthifier(self.with_gcs))
yield ("ARROW_HDFS", truthifier(self.with_hdfs))
yield ("ARROW_HIVESERVER2", truthifier(self.with_hiveserver2))
yield ("ARROW_IPC", truthifier(self.with_ipc))
@@ -226,6 +228,7 @@ class CppConfiguration:
yield ("ARROW_JNI", truthifier(self.with_jni))
yield ("ARROW_MIMALLOC", truthifier(self.with_mimalloc))
yield ("ARROW_JEMALLOC", truthifier(self.with_jemalloc))
+ yield ("ARROW_PARQUET", truthifier(self.with_parquet))
yield ("ARROW_PLASMA", truthifier(self.with_plasma))
yield ("ARROW_PYTHON", truthifier(self.with_python))
yield ("ARROW_S3", truthifier(self.with_s3))
diff --git a/dev/release/verify-release-candidate.sh
b/dev/release/verify-release-candidate.sh
index a512449aea..cbf3c9c51e 100755
--- a/dev/release/verify-release-candidate.sh
+++ b/dev/release/verify-release-candidate.sh
@@ -662,6 +662,9 @@ test_python() {
if [ "${ARROW_GANDIVA}" = "ON" ]; then
export PYARROW_WITH_GANDIVA=1
fi
+ if [ "${ARROW_GCS}" = "ON" ]; then
+ export PYARROW_WITH_GCS=1
+ fi
if [ "${ARROW_PLASMA}" = "ON" ]; then
export PYARROW_WITH_PLASMA=1
fi
@@ -694,6 +697,9 @@ import pyarrow.parquet
if [ "${ARROW_GANDIVA}" == "ON" ]; then
python -c "import pyarrow.gandiva"
fi
+ if [ "${ARROW_GCS}" == "ON" ]; then
+ python -c "import pyarrow._gcsfs"
+ fi
if [ "${ARROW_PLASMA}" == "ON" ]; then
python -c "import pyarrow.plasma"
fi
@@ -701,6 +707,7 @@ import pyarrow.parquet
python -c "import pyarrow._s3fs"
fi
+
# Install test dependencies
pip install -r requirements-test.txt
@@ -1000,6 +1007,7 @@ test_linux_wheels() {
}
test_macos_wheels() {
+ local check_gcs=ON
local check_s3=ON
local check_flight=ON
@@ -1019,6 +1027,7 @@ test_macos_wheels() {
for platform in ${platform_tags}; do
show_header "Testing Python ${pyver} wheel for platform ${platform}"
if [[ "$platform" == *"10_9"* ]]; then
+ check_gcs=OFF
check_s3=OFF
fi
@@ -1026,7 +1035,7 @@ test_macos_wheels() {
VENV_ENV=wheel-${pyver}-${platform} PYTHON_VERSION=${pyver}
maybe_setup_virtualenv || continue
pip install pyarrow-${VERSION}-cp${pyver/.}-cp${python/.}-${platform}.whl
- INSTALL_PYARROW=OFF ARROW_FLIGHT=${check_flight} ARROW_S3=${check_s3} \
+ INSTALL_PYARROW=OFF ARROW_FLIGHT=${check_flight} ARROW_GCS=${check_gcs}
ARROW_S3=${check_s3} \
${ARROW_SOURCE_DIR}/ci/scripts/python_wheel_unix_test.sh
${ARROW_SOURCE_DIR}
done
done
@@ -1155,9 +1164,9 @@ fi
: ${ARROW_CUDA:=OFF}
: ${ARROW_FLIGHT:=ON}
: ${ARROW_GANDIVA:=ON}
+: ${ARROW_GCS:=OFF}
: ${ARROW_PLASMA:=ON}
: ${ARROW_S3:=OFF}
-: ${ARROW_GCS:=OFF}
TEST_SUCCESS=no
diff --git a/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat
b/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat
index f0e26f0bc8..a03a37722f 100644
--- a/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat
+++ b/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat
@@ -17,6 +17,7 @@ pushd "%SRC_DIR%"\python
SET ARROW_HOME=%LIBRARY_PREFIX%
SET SETUPTOOLS_SCM_PRETEND_VERSION=%PKG_VERSION%
SET PYARROW_BUILD_TYPE=release
+SET PYARROW_WITH_GCS=1
SET PYARROW_WITH_S3=1
SET PYARROW_WITH_HDFS=1
SET PYARROW_WITH_DATASET=1
diff --git a/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh
b/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh
index 826942b62c..6e23c5eed9 100644
--- a/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh
+++ b/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh
@@ -17,6 +17,7 @@ if [[ "${target_platform}" == "osx-arm64" ]]; then
else
export PYARROW_WITH_GANDIVA=1
fi
+export PYARROW_WITH_GCS=1
export PYARROW_WITH_HDFS=1
export PYARROW_WITH_ORC=1
export PYARROW_WITH_PARQUET=1
diff --git a/dev/tasks/homebrew-formulae/apache-arrow.rb
b/dev/tasks/homebrew-formulae/apache-arrow.rb
index a22b62afc2..94a1a67a1a 100644
--- a/dev/tasks/homebrew-formulae/apache-arrow.rb
+++ b/dev/tasks/homebrew-formulae/apache-arrow.rb
@@ -72,6 +72,7 @@ class ApacheArrow < Formula
args = %W[
-DARROW_FLIGHT=ON
-DARROW_GANDIVA=ON
+ -DARROW_GCS=ON
-DARROW_INSTALL_NAME_RPATH=OFF
-DARROW_JEMALLOC=ON
-DARROW_MIMALLOC=ON
@@ -91,7 +92,6 @@ class ApacheArrow < Formula
-DCMAKE_FIND_PACKAGE_PREFER_CONFIG=TRUE
-DPython3_EXECUTABLE=#{Formula["[email protected]"].bin/"python3"}
]
- # Re-enable -DARROW_S3=ON and add back aws-sdk-cpp to depends_on in
ARROW-6437
mkdir "build" do
system "cmake", "../cpp", *std_cmake_args, *args
diff --git a/dev/tasks/python-wheels/github.osx.amd64.yml
b/dev/tasks/python-wheels/github.osx.amd64.yml
index d0f834f40c..c18e080ac2 100644
--- a/dev/tasks/python-wheels/github.osx.amd64.yml
+++ b/dev/tasks/python-wheels/github.osx.amd64.yml
@@ -93,7 +93,7 @@ jobs:
$PYTHON -m venv build-env
source build-env/bin/activate
pip install --upgrade pip wheel
- arrow/ci/scripts/python_wheel_macos_build.sh x86_64 $(pwd)/arrow
$(pwd)/build
+ PYTHON=python arrow/ci/scripts/python_wheel_macos_build.sh x86_64
$(pwd)/arrow $(pwd)/build
- name: Test Wheel
shell: bash
@@ -101,6 +101,8 @@ jobs:
$PYTHON -m venv test-env
source test-env/bin/activate
pip install --upgrade pip wheel
+ pip install -r arrow/python/requirements-wheel-test.txt
+ PYTHON=python arrow/ci/scripts/install_gcs_testbench.sh default
arrow/ci/scripts/python_wheel_unix_test.sh $(pwd)/arrow
{{
macros.github_upload_releases("arrow/python/repaired_wheels/*.whl")|indent }}
diff --git a/dev/tasks/python-wheels/github.osx.arm64.yml
b/dev/tasks/python-wheels/github.osx.arm64.yml
index 101d8c6ee8..7198da7de4 100644
--- a/dev/tasks/python-wheels/github.osx.arm64.yml
+++ b/dev/tasks/python-wheels/github.osx.arm64.yml
@@ -134,6 +134,8 @@ jobs:
$PYTHON -m venv test-arm64-env
source test-arm64-env/bin/activate
pip install --upgrade pip wheel
+ arch -arm64 pip install -r arrow/python/requirements-wheel-test.txt
+ PYTHON=python arch -arm64 arrow/ci/scripts/install_gcs_testbench.sh
default
arch -arm64 arrow/ci/scripts/python_wheel_unix_test.sh $(pwd)/arrow
{% if arch == "universal2" %}
@@ -145,6 +147,8 @@ jobs:
$PYTHON -m venv test-amd64-env
source test-amd64-env/bin/activate
pip install --upgrade pip wheel
+ arch -x86_64 pip install -r arrow/python/requirements-wheel-test.txt
+ PYTHON=python arch -x86_64 arrow/ci/scripts/install_gcs_testbench.sh
default
arch -x86_64 arrow/ci/scripts/python_wheel_unix_test.sh $(pwd)/arrow
{% endif %}
diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml
index 95d404932c..1bdb4d2821 100644
--- a/dev/tasks/tasks.yml
+++ b/dev/tasks/tasks.yml
@@ -468,9 +468,9 @@ tasks:
{############################## Wheel OSX ####################################}
-# enable S3 support from macOS 10.13 so we don't need to bundle curl, crypt
and ssl
-{% for macos_version, macos_codename, arrow_s3 in [("10.9", "mavericks",
"OFF"),
- ("10.13", "high-sierra",
"ON")] %}
+# enable S3 and GCS support from macOS 10.13 so we don't need to bundle curl,
crypt and ssl
+{% for macos_version, macos_codename, arrow_s3, arrow_gcs in [("10.9",
"mavericks", "OFF", "OFF"),
+ ("10.13",
"high-sierra", "ON", "ON")] %}
{% set platform_tag = "macosx_{}_x86_64".format(macos_version.replace('.',
'_')) %}
wheel-macos-{{ macos_codename }}-{{ python_tag }}-amd64:
@@ -480,6 +480,7 @@ tasks:
python_version: "{{ python_version }}"
macos_deployment_target: "{{ macos_version }}"
arrow_s3: "{{ arrow_s3 }}"
+ arrow_gcs: "{{ arrow_gcs }}"
artifacts:
- pyarrow-{no_rc_version}-{{ python_tag }}-{{ abi_tag }}-{{ platform_tag
}}.whl
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 7386c256fa..a657f56bb2 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -412,6 +412,10 @@ set(CYTHON_EXTENSIONS
set(LINK_LIBS arrow_shared arrow_python_shared)
+if(PYARROW_BUILD_GCS)
+ set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _gcsfs)
+endif()
+
if(PYARROW_BUILD_S3)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _s3fs)
endif()
diff --git a/python/asv-build.sh b/python/asv-build.sh
index 7de5ff4a2c..188085f927 100755
--- a/python/asv-build.sh
+++ b/python/asv-build.sh
@@ -49,6 +49,7 @@ cmake -GNinja \
-DARROW_CXXFLAGS=$CXXFLAGS \
-DARROW_USE_GLOG=off \
-DARROW_FLIGHT=on \
+ -DARROW_GCS=on \
-DARROW_ORC=on \
-DARROW_PARQUET=on \
-DARROW_PYTHON=on \
@@ -66,6 +67,7 @@ export SETUPTOOLS_SCM_PRETEND_VERSION=0.0.1
export PYARROW_BUILD_TYPE=release
export PYARROW_PARALLEL=8
export PYARROW_WITH_FLIGHT=1
+export PYARROW_WITH_GCS=1
export PYARROW_WITH_ORC=1
export PYARROW_WITH_PARQUET=1
export PYARROW_WITH_PLASMA=1
diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx
index 75ad0ccd9b..af5bebe7d6 100644
--- a/python/pyarrow/_fs.pyx
+++ b/python/pyarrow/_fs.pyx
@@ -367,6 +367,9 @@ cdef class FileSystem(_Weakrefable):
elif typ == 's3':
from pyarrow._s3fs import S3FileSystem
self = S3FileSystem.__new__(S3FileSystem)
+ elif typ == 'gcs':
+ from pyarrow._gcsfs import GcsFileSystem
+ self = GcsFileSystem.__new__(GcsFileSystem)
elif typ == 'hdfs':
from pyarrow._hdfs import HadoopFileSystem
self = HadoopFileSystem.__new__(HadoopFileSystem)
diff --git a/python/pyarrow/_gcsfs.pyx b/python/pyarrow/_gcsfs.pyx
new file mode 100644
index 0000000000..9cff12fb2e
--- /dev/null
+++ b/python/pyarrow/_gcsfs.pyx
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata,
+ pyarrow_unwrap_metadata)
+from pyarrow.lib import frombytes, tobytes, KeyValueMetadata, ensure_metadata
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_fs cimport *
+from pyarrow._fs cimport FileSystem, TimePoint_to_ns, PyDateTime_to_TimePoint
+from cython.operator cimport dereference as deref
+
+from datetime import datetime, timedelta, timezone
+
+
+cdef class GcsFileSystem(FileSystem):
+ """
+ Google Cloud Storage (GCS) backed FileSystem implementation
+
+ By default uses the process described in https://google.aip.dev/auth/4110
+ to resolve credentials. If not running on Google Cloud Platform (GCP),
+ this generally requires the environment variable
+ GOOGLE_APPLICATION_CREDENTIALS to point to a JSON file
+ containing credentials.
+
+ Note: GCS buckets are special and the operations available on them may be
+ limited or more expensive than expected compared to local file systems.
+
+ Note: When pickling a GcsFileSystem that uses default credentials,
resolution
+ credentials are not stored in the serialized data. Therefore, when
unpickling
+ it is assumed that the necessary credentials are in place for the target
+ process.
+
+ Parameters
+ ----------
+ anonymous : boolean, default False
+ Whether to connect anonymously.
+ If true, will not attempt to look up credentials using standard GCP
+ configuration methods.
+ access_token : str, default None
+ GCP access token. If provided, temporary credentials will be fetched
by
+ assuming this role; also, a `credential_token_expiration` must be
+ specified as well.
+ target_service_account : str, default None
+ An optional service account to try to impersonate when accessing GCS.
This
+ requires the specified credential user or service account to have the
necessary
+ permissions.
+ credential_token_expiration : datetime, default None
+ Expiration for credential generated with an access token. Must be
specified
+ if `access_token` is specified.
+ default_bucket_location : str, default 'US'
+ GCP region to create buckets in.
+ scheme : str, default 'https'
+ GCS connection transport scheme.
+ endpoint_override : str, default None
+ Override endpoint with a connect string such as "localhost:9000"
+ default_metadata : mapping or pyarrow.KeyValueMetadata, default None
+ Default metadata for `open_output_stream`. This will be ignored if
+ non-empty metadata is passed to `open_output_stream`.
+ retry_time_limit : timedelta, default None
+ Set the maximum amount of time the GCS client will attempt to retry
+ transient errors. Subsecond granularity is ignored.
+ """
+
+ cdef:
+ CGcsFileSystem* gcsfs
+
+ def __init__(self, *, bint anonymous=False, access_token=None,
+ target_service_account=None, credential_token_expiration=None,
+ default_bucket_location='US',
+ scheme=None,
+ endpoint_override=None,
+ default_metadata=None,
+ retry_time_limit=None):
+ cdef:
+ CGcsOptions options
+ shared_ptr[CGcsFileSystem] wrapped
+ double time_limit_seconds
+
+ # Intentional use of truthiness because empty strings aren't valid and
+ # for reconstruction from pickling will give empty strings.
+ if anonymous and (target_service_account or access_token):
+ raise ValueError(
+ 'anonymous option is not compatible with
target_service_account and '
+ 'access_token'
+ )
+ elif bool(access_token) != bool(credential_token_expiration):
+ raise ValueError(
+ 'access_token and credential_token_expiration must be '
+ 'specified together'
+ )
+
+ elif anonymous:
+ options = CGcsOptions.Anonymous()
+ elif access_token:
+ if not isinstance(credential_token_expiration, datetime):
+ raise ValueError(
+ "credential_token_expiration must be a datetime")
+ options = CGcsOptions.FromAccessToken(
+ tobytes(access_token),
+
PyDateTime_to_TimePoint(<PyDateTime_DateTime*>credential_token_expiration))
+ else:
+ options = CGcsOptions.Defaults()
+
+ # Target service account requires base credentials so
+ # it is not part of the if/else chain above which only
+ # handles base credentials.
+ if target_service_account:
+ options = CGcsOptions.FromImpersonatedServiceAccount(
+ options.credentials, tobytes(target_service_account))
+
+ options.default_bucket_location = tobytes(default_bucket_location)
+
+ if scheme is not None:
+ options.scheme = tobytes(scheme)
+ if endpoint_override is not None:
+ options.endpoint_override = tobytes(endpoint_override)
+ if default_metadata is not None:
+ options.default_metadata = pyarrow_unwrap_metadata(
+ ensure_metadata(default_metadata))
+ if retry_time_limit is not None:
+ time_limit_seconds = retry_time_limit.total_seconds()
+ options.retry_limit_seconds = time_limit_seconds
+
+ with nogil:
+ wrapped = GetResultValue(CGcsFileSystem.Make(options))
+
+ self.init(<shared_ptr[CFileSystem]> wrapped)
+
+ cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+ FileSystem.init(self, wrapped)
+ self.gcsfs = <CGcsFileSystem*> wrapped.get()
+
+ @classmethod
+ def _reconstruct(cls, kwargs):
+ return cls(**kwargs)
+
+ def _expiration_datetime_from_options(self):
+ expiration_ns = TimePoint_to_ns(
+ self.gcsfs.options().credentials.expiration())
+ if expiration_ns == 0:
+ return None
+ return datetime.fromtimestamp(expiration_ns / 1.0e9, timezone.utc)
+
+ def __reduce__(self):
+ cdef CGcsOptions opts = self.gcsfs.options()
+ service_account = frombytes(opts.credentials.target_service_account())
+ expiration_dt = self._expiration_datetime_from_options()
+ retry_time_limit = None
+ if opts.retry_limit_seconds.has_value():
+ retry_time_limit = timedelta(
+ seconds=opts.retry_limit_seconds.value())
+ return (
+ GcsFileSystem._reconstruct, (dict(
+ access_token=frombytes(opts.credentials.access_token()),
+ anonymous=opts.credentials.anonymous(),
+ credential_token_expiration=expiration_dt,
+ target_service_account=service_account,
+ scheme=frombytes(opts.scheme),
+ endpoint_override=frombytes(opts.endpoint_override),
+ default_bucket_location=frombytes(
+ opts.default_bucket_location),
+ default_metadata=pyarrow_wrap_metadata(opts.default_metadata),
+ retry_time_limit=retry_time_limit
+ ),))
+
+ @property
+ def default_bucket_location(self):
+ """
+ The GCP location this filesystem will write to.
+ """
+ return frombytes(self.gcsfs.options().default_bucket_location)
diff --git a/python/pyarrow/conftest.py b/python/pyarrow/conftest.py
index b114f7d1c6..638dad8568 100644
--- a/python/pyarrow/conftest.py
+++ b/python/pyarrow/conftest.py
@@ -26,6 +26,7 @@ groups = [
'hypothesis',
'fastparquet',
'gandiva',
+ 'gcs',
'gdb',
'gzip',
'hdfs',
@@ -56,6 +57,7 @@ defaults = {
'fastparquet': False,
'flight': False,
'gandiva': False,
+ 'gcs': False,
'gdb': True,
'gzip': Codec.is_available('gzip'),
'hdfs': False,
@@ -145,6 +147,13 @@ try:
except ImportError:
pass
+try:
+ from pyarrow.fs import GcsFileSystem # noqa
+ defaults['gcs'] = True
+except ImportError:
+ pass
+
+
try:
from pyarrow.fs import S3FileSystem # noqa
defaults['s3'] = True
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index f22eaf0304..932fc82789 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -45,6 +45,11 @@ try:
except ImportError:
_not_imported.append("HadoopFileSystem")
+try:
+ from pyarrow._gcsfs import GcsFileSystem # noqa
+except ImportError:
+ _not_imported.append("GcsFileSystem")
+
try:
from pyarrow._s3fs import ( # noqa
S3FileSystem, S3LogLevel, initialize_s3, finalize_s3,
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index ba651af50b..c55fd315b1 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -54,6 +54,13 @@ cdef extern from "arrow/util/decimal.h" namespace "arrow"
nogil:
cdef cppclass CDecimal256" arrow::Decimal256":
c_string ToString(int32_t scale) const
+cdef extern from "arrow/util/optional.h" namespace "arrow::util" nogil:
+ cdef cppclass c_optional"arrow::util::optional"[T]:
+ c_bool has_value()
+ T value()
+ c_optional(T&)
+ c_optional& operator=[U](U&)
+
cdef extern from "arrow/config.h" namespace "arrow" nogil:
cdef cppclass CBuildInfo" arrow::BuildInfo":
diff --git a/python/pyarrow/includes/libarrow_fs.pxd
b/python/pyarrow/includes/libarrow_fs.pxd
index e491233e88..d0e1501003 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -200,6 +200,40 @@ cdef extern from "arrow/filesystem/api.h" namespace
"arrow::fs" nogil:
cdef CResult[c_string] ResolveS3BucketRegion(const c_string& bucket)
+ cdef cppclass CGcsCredentials "arrow::fs::GcsCredentials":
+ c_bool anonymous()
+ CTimePoint expiration()
+ c_string access_token()
+ c_string target_service_account()
+
+ cdef cppclass CGcsOptions "arrow::fs::GcsOptions":
+ CGcsCredentials credentials
+ c_string endpoint_override
+ c_string scheme
+ c_string default_bucket_location
+ c_optional[double] retry_limit_seconds
+ shared_ptr[const CKeyValueMetadata] default_metadata
+ c_bool Equals(const CS3Options& other)
+
+ @staticmethod
+ CGcsOptions Defaults()
+
+ @staticmethod
+ CGcsOptions Anonymous()
+
+ @staticmethod
+ CGcsOptions FromAccessToken(const c_string& access_token,
+ CTimePoint expiration)
+
+ @staticmethod
+ CGcsOptions FromImpersonatedServiceAccount(const CGcsCredentials&
base_credentials,
+ c_string&
target_service_account)
+
+ cdef cppclass CGcsFileSystem "arrow::fs::GcsFileSystem":
+ @staticmethod
+ CResult[shared_ptr[CGcsFileSystem]] Make(const CGcsOptions& options)
+ CGcsOptions options()
+
cdef cppclass CHdfsOptions "arrow::fs::HdfsOptions":
HdfsConnectionConfig connection_config
int32_t buffer_size
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index 0b7f1618b0..a06ac92095 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -18,6 +18,7 @@
import os
import pathlib
import subprocess
+import sys
from tempfile import TemporaryDirectory
import pytest
@@ -173,3 +174,23 @@ def s3_server(s3_connection):
finally:
if proc is not None:
proc.kill()
+
+
[email protected](scope='session')
+def gcs_server():
+ port = find_free_port()
+ env = os.environ.copy()
+ args = [sys.executable, '-m', 'testbench', '--port', str(port)]
+ proc = None
+ try:
+ proc = subprocess.Popen(args, env=env)
+ except OSError as e:
+ pytest.skip(f"Command {args} failed to execute: {e}")
+ else:
+ yield {
+ 'connection': ('localhost', port),
+ 'process': proc,
+ }
+ finally:
+ if proc is not None:
+ proc.kill()
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index 4fd72704a7..4bd532525a 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -200,6 +200,34 @@ def subtree_localfs(request, tempdir, localfs):
)
[email protected]
+def gcsfs(request, gcs_server):
+ request.config.pyarrow.requires('gcs')
+ from pyarrow.fs import GcsFileSystem
+
+ host, port = gcs_server['connection']
+ bucket = 'pyarrow-filesystem/'
+ # Make sure the server is alive.
+ assert gcs_server['process'].poll() is None
+
+ fs = GcsFileSystem(
+ endpoint_override=f'{host}:{port}',
+ scheme='http',
+ # Mock endpoint doesn't check credentials.
+ anonymous=True,
+ retry_time_limit=timedelta(seconds=45)
+ )
+ fs.create_dir(bucket)
+
+ yield dict(
+ fs=fs,
+ pathfn=bucket.__add__,
+ allow_move_dir=False,
+ allow_append_to_file=False,
+ )
+ fs.delete_dir(bucket)
+
+
@pytest.fixture
def s3fs(request, s3_server):
request.config.pyarrow.requires('s3')
@@ -345,6 +373,11 @@ def py_fsspec_s3fs(request, s3_server):
id='S3FileSystem',
marks=pytest.mark.s3
),
+ pytest.param(
+ pytest.lazy_fixture('gcsfs'),
+ id='GcsFileSystem',
+ marks=pytest.mark.gcs
+ ),
pytest.param(
pytest.lazy_fixture('hdfs'),
id='HadoopFileSystem',
@@ -869,6 +902,10 @@ def test_open_input_file(fs, pathfn):
s.write(data)
read_from = len(b'some data') * 512
+ with fs.open_input_file(p) as f:
+ result = f.read()
+ assert result == data
+
with fs.open_input_file(p) as f:
f.seek(read_from)
result = f.read()
@@ -951,7 +988,7 @@ def test_open_output_stream_metadata(fs, pathfn):
assert f.read() == data
got_metadata = f.metadata()
- if fs.type_name == 's3' or 'mock' in fs.type_name:
+ if fs.type_name in ['s3', 'gcs'] or 'mock' in fs.type_name:
for k, v in metadata.items():
assert got_metadata[k] == v.encode()
else:
@@ -1010,6 +1047,42 @@ def test_mockfs_mtime_roundtrip(mockfs):
assert info.mtime == dt
[email protected]
+def test_gcs_options():
+ from pyarrow.fs import GcsFileSystem
+ dt = datetime.now()
+ fs = GcsFileSystem(access_token='abc',
+ target_service_account='service_account@apache',
+ credential_token_expiration=dt,
+ default_bucket_location='us-west2',
+ scheme='https', endpoint_override='localhost:8999')
+ assert isinstance(fs, GcsFileSystem)
+ assert fs.default_bucket_location == 'us-west2'
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = GcsFileSystem()
+ assert isinstance(fs, GcsFileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = GcsFileSystem(anonymous=True)
+ assert isinstance(fs, GcsFileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ fs = GcsFileSystem(default_metadata={"ACL": "authenticated-read",
+ "Content-Type": "text/plain"})
+ assert isinstance(fs, GcsFileSystem)
+ assert pickle.loads(pickle.dumps(fs)) == fs
+
+ with pytest.raises(ValueError):
+ GcsFileSystem(access_token='access')
+ with pytest.raises(ValueError):
+ GcsFileSystem(anonymous=True, access_token='secret')
+ with pytest.raises(ValueError):
+ GcsFileSystem(anonymous=True, target_service_account='acct')
+ with pytest.raises(ValueError):
+ GcsFileSystem(credential_token_expiration=datetime.now())
+
+
@pytest.mark.s3
def test_s3_options():
from pyarrow.fs import S3FileSystem
@@ -1321,6 +1394,26 @@ def test_filesystem_from_uri_s3(s3_server):
assert info.type == FileType.Directory
[email protected]
+def test_filesystem_from_uri_gcs(gcs_server):
+ from pyarrow.fs import GcsFileSystem
+
+ host, port = gcs_server['connection']
+
+ uri = ("gs://anonymous@" +
+ f"mybucket/foo/bar?scheme=http&endpoint_override={host}:{port}&" +
+ "retry_limit_seconds=5")
+
+ fs, path = FileSystem.from_uri(uri)
+ assert isinstance(fs, GcsFileSystem)
+ assert path == "mybucket/foo/bar"
+
+ fs.create_dir(path)
+ [info] = fs.get_file_info([path])
+ assert info.path == path
+ assert info.type == FileType.Directory
+
+
def test_py_filesystem():
handler = DummyHandler()
fs = PyFileSystem(handler)
diff --git a/python/pyarrow/tests/test_pandas.py
b/python/pyarrow/tests/test_pandas.py
index 078714df04..143bb0e33e 100644
--- a/python/pyarrow/tests/test_pandas.py
+++ b/python/pyarrow/tests/test_pandas.py
@@ -226,7 +226,7 @@ class TestConvertMetadata:
with pytest.warns(None) as record:
_check_pandas_roundtrip(df, preserve_index=True)
- assert len(record) == 0
+ assert len(record) == 0, [r.message for r in record]
def test_multiindex_columns(self):
columns = pd.MultiIndex.from_arrays([
@@ -277,7 +277,7 @@ class TestConvertMetadata:
with pytest.warns(None) as record:
_check_pandas_roundtrip(df, preserve_index=True)
- assert len(record) == 0
+ assert len(record) == 0, [r.message for r in record]
def test_integer_index_column(self):
df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')])
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 0a54b401b1..de9677f3d4 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -1145,7 +1145,7 @@ cdef class KeyValueMetadata(_Metadata, Mapping):
return result
-cdef KeyValueMetadata ensure_metadata(object meta, c_bool allow_none=False):
+cpdef KeyValueMetadata ensure_metadata(object meta, c_bool allow_none=False):
if allow_none and meta is None:
return None
elif isinstance(meta, KeyValueMetadata):
diff --git a/python/setup.py b/python/setup.py
index 2486a8959e..b572be1cee 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -113,6 +113,8 @@ class build_ext(_build_ext):
('with-parquet', None, 'build the Parquet extension'),
('with-parquet-encryption', None,
'build the Parquet encryption extension'),
+ ('with-gcs', None,
+ 'build the Google Cloud Storage (GCS) extension'),
('with-s3', None, 'build the Amazon S3 extension'),
('with-static-parquet', None, 'link parquet statically'),
('with-static-boost', None, 'link boost statically'),
@@ -155,6 +157,8 @@ class build_ext(_build_ext):
if not hasattr(sys, 'gettotalrefcount'):
self.build_type = 'release'
+ self.with_gcs = strtobool(
+ os.environ.get('PYARROW_WITH_GCS', '0'))
self.with_s3 = strtobool(
os.environ.get('PYARROW_WITH_S3', '0'))
self.with_hdfs = strtobool(
@@ -216,6 +220,7 @@ class build_ext(_build_ext):
'_parquet_encryption',
'_orc',
'_plasma',
+ '_gcsfs',
'_s3fs',
'_substrait',
'_hdfs',
@@ -281,6 +286,7 @@ class build_ext(_build_ext):
append_cmake_bool(self.with_parquet_encryption,
'PYARROW_BUILD_PARQUET_ENCRYPTION')
append_cmake_bool(self.with_plasma, 'PYARROW_BUILD_PLASMA')
+ append_cmake_bool(self.with_gcs, 'PYARROW_BUILD_GCS')
append_cmake_bool(self.with_s3, 'PYARROW_BUILD_S3')
append_cmake_bool(self.with_hdfs, 'PYARROW_BUILD_HDFS')
append_cmake_bool(self.with_tensorflow, 'PYARROW_USE_TENSORFLOW')
@@ -447,6 +453,8 @@ class build_ext(_build_ext):
return True
if name == '_substrait' and not self.with_substrait:
return True
+ if name == '_gcsfs' and not self.with_gcs:
+ return True
if name == '_s3fs' and not self.with_s3:
return True
if name == '_hdfs' and not self.with_hdfs: