pitrou commented on code in PR #39067: URL: https://github.com/apache/arrow/pull/39067#discussion_r1511356168
########## cpp/examples/arrow/filesystem_usage_example.cc: ########## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <iostream> + +#include <arrow/filesystem/filesystem.h> +#include <arrow/result.h> + +namespace fs = arrow::fs; + +// Demonstrate dynamically loading a user-defined Arrow FileSystem + +arrow::Status Execute() { + ARROW_RETURN_NOT_OK(arrow::fs::LoadFileSystemFactories(LIBPATH)); Review Comment: I think we should make the preprocessor variable a bit more specific, both for readability and to avoid name clashes. Something like `FILESYSTEM_EXAMPLE_LIBPATH`? ########## cpp/src/arrow/filesystem/examplefs.cc: ########## @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review Comment: If this file is testing-specific (is it?), I would recommend it go under `arrow/testing`. ########## cpp/src/arrow/filesystem/filesystem.cc: ########## @@ -674,6 +684,134 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs, return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); } +class FileSystemFactoryRegistry { + public: + static FileSystemFactoryRegistry* GetInstance() { + static FileSystemFactoryRegistry registry; + return ®istry; + } + + Result<FileSystemFactory*> FactoryForScheme(const std::string& scheme) { + std::shared_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto it = scheme_to_factory_.find(scheme); + if (it == scheme_to_factory_.end()) return nullptr; + + return it->second.Map([](const auto& r) { return r.factory; }); + } + + Status MergeInto(FileSystemFactoryRegistry* main_registry) { + std::shared_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto& [main_mutex, main_scheme_to_factory, _] = *main_registry; + std::unique_lock main_lock{main_mutex}; + + std::vector<std::string_view> duplicated_schemes; + for (auto& [scheme, registered] : scheme_to_factory_) { + if (!registered.ok()) { + duplicated_schemes.emplace_back(scheme); + continue; + } + + auto [it, success] = main_scheme_to_factory.emplace(std::move(scheme), registered); + if (success) continue; + + duplicated_schemes.emplace_back(it->first); + } + scheme_to_factory_.clear(); + + if (duplicated_schemes.empty()) return Status::OK(); + return Status::KeyError("Attempted to register ", duplicated_schemes.size(), + " factories for schemes ['", + arrow::internal::JoinStrings(duplicated_schemes, "', '"), + "'] but those schemes were already registered."); + } + + void EnsureFinalized() { + std::unique_lock lock{mutex_}; + if (finalized_) return; + + for (const auto& [_, registered_or_error] : scheme_to_factory_) { + if (!registered_or_error.ok()) continue; + registered_or_error->finalizer(); + } + finalized_ = true; + } + + Status RegisterFactory(std::string scheme, FileSystemFactory factory, void finalizer(), + bool defer_error) { + std::unique_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto [it, success] = + scheme_to_factory_.emplace(std::move(scheme), Registered{factory, finalizer}); + if (success) { + return Status::OK(); + } + + auto st = Status::KeyError("Attempted to register factory for scheme '", it->first, + "' but that scheme is already registered."); + if (!defer_error) return st; + + it->second = std::move(st); + return Status::OK(); + } + + private: + struct Registered { + FileSystemFactory* factory; + void (*finalizer)(); + }; + + static Status AlreadyFinalized() { + return Status::Invalid("FileSystem factories were already finalized!"); + } + + std::shared_mutex mutex_; + std::unordered_map<std::string, Result<Registered>> scheme_to_factory_; + bool finalized_ = false; +}; + +Status RegisterFileSystemFactory(std::string scheme, FileSystemFactory factory, + void finalizer()) { + return FileSystemFactoryRegistry::GetInstance()->RegisterFactory( + std::move(scheme), factory, finalizer, /*defer_error=*/false); +} + +void EnsureFinalized() { FileSystemFactoryRegistry::GetInstance()->EnsureFinalized(); } + +FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory factory, + void finalizer()) { + DCHECK_OK(FileSystemFactoryRegistry::GetInstance()->RegisterFactory( + std::move(scheme), std::move(factory), finalizer, /*defer_error=*/true)); +} + +extern "C" { +ARROW_EXPORT void* Arrow_FileSystem_GetRegistry() { + return FileSystemFactoryRegistry::GetInstance(); +} +} +constexpr auto kGetRegistryName = "Arrow_FileSystem_GetRegistry"; + +Status LoadFileSystemFactories(const char* libpath) { + using ::arrow::internal::GetSymbolAs; + using ::arrow::internal::LoadDynamicLibrary; + + ARROW_ASSIGN_OR_RAISE(void* lib, LoadDynamicLibrary(libpath)); + + if (auto* get_instance = GetSymbolAs<void*()>(lib, kGetRegistryName).ValueOr(nullptr)) { Review Comment: I'm curious: why is it ok to fail if the symbol is not found in the library? ########## cpp/src/arrow/filesystem/filesystem.cc: ########## @@ -674,6 +684,134 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs, return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); } +class FileSystemFactoryRegistry { + public: + static FileSystemFactoryRegistry* GetInstance() { + static FileSystemFactoryRegistry registry; + return ®istry; + } + + Result<FileSystemFactory*> FactoryForScheme(const std::string& scheme) { + std::shared_lock lock{mutex_}; + if (finalized_) return AlreadyFinalized(); + + auto it = scheme_to_factory_.find(scheme); + if (it == scheme_to_factory_.end()) return nullptr; + + return it->second.Map([](const auto& r) { return r.factory; }); + } + + Status MergeInto(FileSystemFactoryRegistry* main_registry) { + std::shared_lock lock{mutex_}; Review Comment: Is it ok to take a shared lock here even though we'll be mutating `scheme_to_factory_`? ########## cpp/src/arrow/filesystem/filesystem.h: ########## @@ -519,6 +568,84 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath( /// @} +/// \defgroup filesystem-factory-registration Helpers for FileSystem registration +/// +/// @{ + +/// \brief Register a Filesystem factory +/// +/// Support for custom Uri schemes can be added by registering a factory +/// for the corresponding FileSystem. +/// +/// \param[in] scheme a Uri scheme which the factory will handle. +/// If a factory has already been registered for a scheme, +/// the new factory will be ignored. +/// \param[in] factory a function which can produce a FileSystem for Uris which match +/// scheme. +/// \param[in] finalizer a function which must be called to finalize the factory before +/// the process exits, or nullptr if no finalization is necessary. +/// \return raises KeyError if a name collision occurs. +ARROW_EXPORT Status RegisterFileSystemFactory(std::string scheme, + FileSystemFactory factory, + void finalizer() = NULLPTR); + +/// \brief Register Filesystem factories from a shared library +/// +/// The library should register factories as part of its initialization. Review Comment: This description does not make it obvious who/what this function is for. It seems that it should be used only for the built-in filesystem factories, not third party ones? The docstring should be a bit more explicit. ########## cpp/src/arrow/filesystem/localfs_test.cc: ########## @@ -86,6 +84,88 @@ Result<std::shared_ptr<FileSystem>> FSFromUriOrPath(const std::string& uri, //////////////////////////////////////////////////////////////////////////// // Misc tests +Result<std::shared_ptr<FileSystem>> SlowFileSystemFactory(const Uri& uri, + const io::IOContext& io_context, + std::string* out_path) { + auto local_uri = "file" + uri.ToString().substr(uri.scheme().size()); + ARROW_ASSIGN_OR_RAISE(auto base_fs, FileSystemFromUri(local_uri, io_context, out_path)); + double average_latency = 1; + int32_t seed = 0xDEADBEEF; + ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items()); + for (const auto& [key, value] : params) { + if (key == "average_latency") { + average_latency = std::stod(value); + } + if (key == "seed") { + seed = std::stoi(value, nullptr, /*base=*/16); + } + } + return std::make_shared<SlowFileSystem>(base_fs, average_latency, seed); +} +FileSystemRegistrar kSlowFileSystemModule{ + "slowfile", + SlowFileSystemFactory, +}; + +TEST(FileSystemFromUri, LinkedRegisteredFactory) { + // Since the registrar's definition is in this translation unit (which is linked to the + // unit test executable), its factory will be registered be loaded automatically before + // main() is entered. + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); +} + +TEST(FileSystemFromUri, LoadedRegisteredFactory) { + // Since the registrar's definition is in libarrow_filesystem_example.so, + // its factory will be registered only after the library is dynamically loaded. + std::string path; + EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path), Raises(StatusCode::Invalid)); + + EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "local"); +} + +TEST(FileSystemFromUri, RuntimeRegisteredFactory) { + std::string path; + EXPECT_THAT(FileSystemFromUri("slowfile2:///hey/yo", &path), + Raises(StatusCode::Invalid)); + + EXPECT_THAT(RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile2:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); +} + +TEST(FileSystemFromUri, RuntimeRegisteredFactoryNameCollision) { Review Comment: You should fold this test into the previous one, otherwise there is no guaranteed that both will be run and in the right order. ########## docs/source/cpp/io.rst: ########## @@ -91,4 +101,42 @@ Concrete implementations are available for Tasks that use filesystems will typically run on the :ref:`I/O thread pool<io_thread_pool>`. For filesystems that support high levels - of concurrency you may get a benefit from increasing the size of the I/O thread pool. \ No newline at end of file + of concurrency you may get a benefit from increasing the size of the I/O thread pool. + +Defining new FileSystems +======================== + +Build complexity can be decreased by compartmentalizing a FileSystem +implementation into a separate shared library, which applications may +link or load dynamically. Arrow's built-in FileSystem implementations +also follow this pattern. Before a scheme can be used with any of the +`High-level factory functions`_ the library which contains it must be Review Comment: I don't think "High-level factory functions" will link to anything as there does not seem to be any such heading in this document. You may want to create a `ref` and corresponding target. ########## cpp/src/arrow/filesystem/filesystem.h: ########## @@ -458,16 +490,25 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { const FileInfo& info) override; Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( const std::string& path, - const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + const std::shared_ptr<const KeyValueMetadata>& metadata) override; Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( const std::string& path, - const std::shared_ptr<const KeyValueMetadata>& metadata = {}) override; + const std::shared_ptr<const KeyValueMetadata>& metadata) override; protected: std::shared_ptr<FileSystem> base_fs_; std::shared_ptr<io::LatencyGenerator> latencies_; }; +/// \brief Ensure all registered filesystem implementations are finalized. +/// +/// Individual finalizers may wait for concurrent calls to finish so as to avoid +/// race conditions. After this function has been called, all filesystem APIs +/// will fail with an error. +/// +/// The user is responsible for synchronization of calls to this function. +void EnsureFinalized(); Review Comment: Is there a plan for this to also handle built-in filesystems such as S3? Otherwise I think the name should be a bit more specific (and probably more verbose unfortunately). ########## cpp/src/arrow/filesystem/filesystem.h: ########## @@ -519,6 +568,84 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath( /// @} +/// \defgroup filesystem-factory-registration Helpers for FileSystem registration +/// +/// @{ + +/// \brief Register a Filesystem factory +/// +/// Support for custom Uri schemes can be added by registering a factory +/// for the corresponding FileSystem. +/// +/// \param[in] scheme a Uri scheme which the factory will handle. +/// If a factory has already been registered for a scheme, +/// the new factory will be ignored. +/// \param[in] factory a function which can produce a FileSystem for Uris which match +/// scheme. +/// \param[in] finalizer a function which must be called to finalize the factory before +/// the process exits, or nullptr if no finalization is necessary. +/// \return raises KeyError if a name collision occurs. +ARROW_EXPORT Status RegisterFileSystemFactory(std::string scheme, + FileSystemFactory factory, + void finalizer() = NULLPTR); Review Comment: Still not sure why we're not taking a `std::function` here. We won't have thousands of them... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
