bkietz commented on code in PR #38269:
URL: https://github.com/apache/arrow/pull/38269#discussion_r1364067519
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -37,34 +43,330 @@ bool AzureOptions::Equals(const AzureOptions& other) const
{
credentials_kind == other.credentials_kind);
}
+Status AzureOptions::ConfigureAccountKeyCredentials(const std::string&
account_name,
+ const std::string&
account_key) {
+ if (this->backend == AzureBackend::Azurite) {
+ account_blob_url = "http://127.0.0.1:10000/" + account_name + "/";
+ account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/";
+ } else {
+ account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/";
+ account_blob_url = "https://" + account_name + ".blob.core.windows.net/";
+ }
+ storage_credentials_provider =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(account_name,
+
account_key);
+ credentials_kind = AzureCredentialsKind::StorageCredentials;
+ return Status::OK();
+}
+namespace {
+
+// An AzureFileSystem represents a single Azure storage account. AzurePath
describes a
+// container and path within that storage account.
+struct AzurePath {
+ std::string full_path;
+ std::string container;
+ std::string path_to_file;
+ std::vector<std::string> path_to_file_parts;
+
+ static Result<AzurePath> FromString(const std::string& s) {
+ // Example expected string format: testcontainer/testdir/testfile.txt
+ // container = testcontainer
+ // path_to_file = testdir/testfile.txt
+ // path_to_file_parts = [testdir, testfile.txt]
+ if (internal::IsLikelyUri(s)) {
+ return Status::Invalid(
+ "Expected an Azure object path of the form 'container/path...', got
a URI: '",
+ s, "'");
+ }
+ auto src = internal::RemoveTrailingSlash(s);
+ auto input_path = std::string(src.data());
+ src = internal::RemoveLeadingSlash(src);
+ auto first_sep = src.find_first_of(internal::kSep);
+ if (first_sep == 0) {
+ return Status::Invalid("Path cannot start with a separator ('",
input_path, "')");
+ }
+ if (first_sep == std::string::npos) {
+ return AzurePath{std::string(src), std::string(src), "", {}};
+ }
+ AzurePath path;
+ path.full_path = std::string(src);
+ path.container = std::string(src.substr(0, first_sep));
+ path.path_to_file = std::string(src.substr(first_sep + 1));
+ path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file);
+ RETURN_NOT_OK(Validate(path));
+ return path;
+ }
+
+ static Status Validate(const AzurePath& path) {
+ auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts);
+ if (!status.ok()) {
+ return Status::Invalid(status.message(), " in path ", path.full_path);
+ } else {
+ return status;
+ }
+ }
+
+ AzurePath parent() const {
+ DCHECK(has_parent());
+ auto parent = AzurePath{"", container, "", path_to_file_parts};
+ parent.path_to_file_parts.pop_back();
+ parent.path_to_file =
internal::JoinAbstractPath(parent.path_to_file_parts);
+ if (parent.path_to_file.empty()) {
+ parent.full_path = parent.container;
+ } else {
+ parent.full_path = parent.container + internal::kSep +
parent.path_to_file;
+ }
+ return parent;
+ }
+
+ bool has_parent() const { return !path_to_file.empty(); }
+
+ bool empty() const { return container.empty() && path_to_file.empty(); }
+
+ bool operator==(const AzurePath& other) const {
+ return container == other.container && path_to_file == other.path_to_file;
+ }
+};
+
+Status PathNotFound(const AzurePath& path) {
+ return ::arrow::fs::internal::PathNotFound(path.full_path);
+}
+
+Status NotAFile(const AzurePath& path) {
+ return ::arrow::fs::internal::NotAFile(path.full_path);
+}
+
+Status ValidateFilePath(const AzurePath& path) {
+ if (path.container.empty()) {
+ return PathNotFound(path);
+ }
+
+ if (path.path_to_file.empty()) {
+ return NotAFile(path);
+ }
+ return Status::OK();
+}
+
+Status ErrorToStatus(const std::string& prefix,
+ const Azure::Storage::StorageException& exception) {
+ return Status::IOError(prefix, " Azure Error: ", exception.what());
+}
+
+template <typename ObjectResult>
+std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult&
result) {
+ auto md = std::make_shared<KeyValueMetadata>();
+ for (auto prop : result) {
+ md->Append(prop.first, prop.second);
+ }
+ return md;
+}
+
+class ObjectInputFile final : public io::RandomAccessFile {
+ public:
+ ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>
blob_client,
+ const io::IOContext& io_context, const AzurePath& path,
+ int64_t size = kNoSize)
+ : blob_client_(std::move(blob_client)),
+ io_context_(io_context),
+ path_(path),
Review Comment:
+1 to https://github.com/apache/arrow/pull/38269/files#r1361295168
```suggestion
ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>
blob_client,
const io::IOContext& io_context, AzurePath path,
int64_t size = kNoSize)
: blob_client_(std::move(blob_client)),
io_context_(io_context),
path_(std::move(path)),
```
##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -154,6 +137,330 @@ TEST(AzureFileSystem, OptionsCompare) {
EXPECT_TRUE(options.Equals(options));
}
+class TestAzureFileSystem : public ::testing::Test {
+ public:
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
+ std::mt19937_64 generator_;
+ std::string container_name_;
+
+ TestAzureFileSystem() : generator_(std::random_device()()) {}
+
+ AzureOptions MakeOptions() {
+ const std::string& account_name = GetAzuriteEnv()->account_name();
+ const std::string& account_key = GetAzuriteEnv()->account_key();
+ AzureOptions options;
+ options.backend = AzureBackend::Azurite;
+ ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name,
account_key));
+ return options;
+ }
+
+ void SetUp() override {
+ ASSERT_THAT(GetAzuriteEnv(), NotNull());
+ ASSERT_OK(GetAzuriteEnv()->status());
+
+ container_name_ = RandomChars(32);
+ auto options = MakeOptions();
+ service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ options.account_blob_url, options.storage_credentials_provider);
+ ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options));
+ auto container_client =
service_client_->GetBlobContainerClient(container_name_);
+ container_client.CreateIfNotExists();
+
+ auto blob_client =
container_client.GetBlockBlobClient(PreexistingObjectName());
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
+ strlen(kLoremIpsum));
+ }
+
+ void TearDown() override {
+ auto containers = service_client_->ListBlobContainers();
+ for (auto container : containers.BlobContainers) {
+ auto container_client =
service_client_->GetBlobContainerClient(container.Name);
+ container_client.DeleteIfExists();
+ }
+ }
+
+ std::string PreexistingContainerName() const { return container_name_; }
+
+ std::string PreexistingContainerPath() const {
+ return PreexistingContainerName() + '/';
+ }
+
+ static std::string PreexistingObjectName() { return "test-object-name"; }
+
+ std::string PreexistingObjectPath() const {
+ return PreexistingContainerPath() + PreexistingObjectName();
+ }
+
+ std::string NotFoundObjectPath() { return PreexistingContainerPath() +
"not-found"; }
+
+ std::string RandomLine(int lineno, std::size_t width) {
+ auto line = std::to_string(lineno) + ": ";
+ line += RandomChars(width - line.size() - 1);
+ line += '\n';
+ return line;
+ }
+
+ std::size_t RandomIndex(std::size_t end) {
+ return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
+ }
+
+ std::string RandomChars(std::size_t count) {
+ auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+ std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
+ std::string s;
+ std::generate_n(std::back_inserter(s), count, [&] { return
fillers[d(generator_)]; });
+ return s;
+ }
+
+ void UploadLines(const std::vector<std::string>& lines, const char*
path_to_file,
+ int total_size) {
+ // TODO: Switch to using Azure filesystem to write once its implemented.
+ auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
+ std::string all_lines = std::accumulate(lines.begin(), lines.end(),
std::string(""));
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(all_lines.data()),
+ total_size);
+ }
+};
+
+TEST_F(TestAzureFileSystem, OpenInputStreamString) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+
+ EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
Review Comment:
For test cases, I think it's preferable to reduce boilerplate by using the
Buffer returning functions:
```suggestion
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(buffer->ToString(), kLoremIpsum);
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -37,34 +43,330 @@ bool AzureOptions::Equals(const AzureOptions& other) const
{
credentials_kind == other.credentials_kind);
}
+Status AzureOptions::ConfigureAccountKeyCredentials(const std::string&
account_name,
+ const std::string&
account_key) {
+ if (this->backend == AzureBackend::Azurite) {
+ account_blob_url = "http://127.0.0.1:10000/" + account_name + "/";
+ account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/";
+ } else {
+ account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/";
+ account_blob_url = "https://" + account_name + ".blob.core.windows.net/";
+ }
+ storage_credentials_provider =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(account_name,
+
account_key);
+ credentials_kind = AzureCredentialsKind::StorageCredentials;
+ return Status::OK();
+}
+namespace {
+
+// An AzureFileSystem represents a single Azure storage account. AzurePath
describes a
+// container and path within that storage account.
+struct AzurePath {
+ std::string full_path;
+ std::string container;
+ std::string path_to_file;
+ std::vector<std::string> path_to_file_parts;
+
+ static Result<AzurePath> FromString(const std::string& s) {
+ // Example expected string format: testcontainer/testdir/testfile.txt
+ // container = testcontainer
+ // path_to_file = testdir/testfile.txt
+ // path_to_file_parts = [testdir, testfile.txt]
+ if (internal::IsLikelyUri(s)) {
+ return Status::Invalid(
+ "Expected an Azure object path of the form 'container/path...', got
a URI: '",
+ s, "'");
+ }
+ auto src = internal::RemoveTrailingSlash(s);
+ auto input_path = std::string(src.data());
+ src = internal::RemoveLeadingSlash(src);
+ auto first_sep = src.find_first_of(internal::kSep);
+ if (first_sep == 0) {
+ return Status::Invalid("Path cannot start with a separator ('",
input_path, "')");
+ }
+ if (first_sep == std::string::npos) {
+ return AzurePath{std::string(src), std::string(src), "", {}};
+ }
+ AzurePath path;
+ path.full_path = std::string(src);
+ path.container = std::string(src.substr(0, first_sep));
+ path.path_to_file = std::string(src.substr(first_sep + 1));
+ path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file);
+ RETURN_NOT_OK(Validate(path));
+ return path;
+ }
+
+ static Status Validate(const AzurePath& path) {
+ auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts);
+ if (!status.ok()) {
+ return Status::Invalid(status.message(), " in path ", path.full_path);
+ } else {
+ return status;
+ }
+ }
+
+ AzurePath parent() const {
+ DCHECK(has_parent());
+ auto parent = AzurePath{"", container, "", path_to_file_parts};
+ parent.path_to_file_parts.pop_back();
+ parent.path_to_file =
internal::JoinAbstractPath(parent.path_to_file_parts);
+ if (parent.path_to_file.empty()) {
+ parent.full_path = parent.container;
+ } else {
+ parent.full_path = parent.container + internal::kSep +
parent.path_to_file;
+ }
+ return parent;
+ }
+
+ bool has_parent() const { return !path_to_file.empty(); }
+
+ bool empty() const { return container.empty() && path_to_file.empty(); }
+
+ bool operator==(const AzurePath& other) const {
+ return container == other.container && path_to_file == other.path_to_file;
+ }
+};
+
+Status PathNotFound(const AzurePath& path) {
+ return ::arrow::fs::internal::PathNotFound(path.full_path);
+}
+
+Status NotAFile(const AzurePath& path) {
+ return ::arrow::fs::internal::NotAFile(path.full_path);
+}
+
+Status ValidateFilePath(const AzurePath& path) {
+ if (path.container.empty()) {
+ return PathNotFound(path);
+ }
+
+ if (path.path_to_file.empty()) {
+ return NotAFile(path);
+ }
+ return Status::OK();
+}
+
+Status ErrorToStatus(const std::string& prefix,
+ const Azure::Storage::StorageException& exception) {
+ return Status::IOError(prefix, " Azure Error: ", exception.what());
+}
+
+template <typename ObjectResult>
+std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult&
result) {
+ auto md = std::make_shared<KeyValueMetadata>();
+ for (auto prop : result) {
+ md->Append(prop.first, prop.second);
+ }
+ return md;
+}
+
+class ObjectInputFile final : public io::RandomAccessFile {
+ public:
+ ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>
blob_client,
+ const io::IOContext& io_context, const AzurePath& path,
+ int64_t size = kNoSize)
+ : blob_client_(std::move(blob_client)),
+ io_context_(io_context),
+ path_(path),
+ content_length_(size) {}
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ return Status::OK();
+ }
+ try {
+ auto properties = blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ metadata_ = GetObjectMetadata(properties.Value.Metadata);
+ return Status::OK();
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
{
+ // Could be either container or blob not found.
+ return PathNotFound(path_);
+ }
+ return ErrorToStatus(
+ "When fetching properties for '" + blob_client_->GetUrl() + "': ",
exception);
+ }
+ }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed file.");
+ }
+ return Status::OK();
+ }
+
+ Status CheckPosition(int64_t position, const char* action) const {
+ DCHECK_GE(content_length_, 0);
+ if (position < 0) {
+ return Status::Invalid("Cannot ", action, " from negative position");
+ }
+ if (position > content_length_) {
+ return Status::IOError("Cannot ", action, " past end of file");
+ }
+ return Status::OK();
+ }
+
+ // RandomAccessFile APIs
+
+ Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+ return metadata_;
+ }
+
+ Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+ const io::IOContext& io_context) override {
+ return metadata_;
+ }
+
+ Status Close() override {
+ blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Result<int64_t> GetSize() override {
+ RETURN_NOT_OK(CheckClosed("size"));
+ return content_length_;
+ }
+
+ Status Seek(int64_t position) override {
+ RETURN_NOT_OK(CheckClosed("seek"));
+ RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+ pos_ = position;
+ return Status::OK();
+ }
+
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ RETURN_NOT_OK(CheckClosed("read"));
+ RETURN_NOT_OK(CheckPosition(position, "read"));
+
+ nbytes = std::min(nbytes, content_length_ - position);
+ if (nbytes == 0) {
+ return 0;
+ }
+
+ // Read the desired range of bytes
+ Azure::Core::Http::HttpRange range{.Offset = position, .Length = nbytes};
+ Azure::Storage::Blobs::DownloadBlobToOptions download_options{.Range =
range};
+ try {
+ return blob_client_
+ ->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes,
download_options)
+ .Value.ContentRange.Length.Value();
+ } catch (const Azure::Storage::StorageException& exception) {
+ return ErrorToStatus("When reading from '" + blob_client_->GetUrl() +
+ "' at position " + std::to_string(position) + "
for " +
+ std::to_string(nbytes) + " bytes: ",
+ exception);
+ }
+ }
+
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ RETURN_NOT_OK(CheckClosed("read"));
+ RETURN_NOT_OK(CheckPosition(position, "read"));
+
+ // No need to allocate more than the remaining number of bytes
+ nbytes = std::min(nbytes, content_length_ - position);
+
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
+ AllocateResizableBuffer(nbytes, io_context_.pool()));
+ if (nbytes > 0) {
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ ReadAt(position, nbytes, buffer->mutable_data()));
+ DCHECK_LE(bytes_read, nbytes);
+ RETURN_NOT_OK(buffer->Resize(bytes_read));
+ }
+ return std::move(buffer);
+ }
+
+ Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+ pos_ += bytes_read;
+ return bytes_read;
+ }
+
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+ pos_ += buffer->size();
+ return std::move(buffer);
+ }
+
+ private:
+ std::shared_ptr<Azure::Storage::Blobs::BlobClient> blob_client_;
+ const io::IOContext io_context_;
+ AzurePath path_;
+
+ bool closed_ = false;
+ int64_t pos_ = 0;
+ int64_t content_length_ = kNoSize;
+ std::shared_ptr<const KeyValueMetadata> metadata_;
+};
+
+} // namespace
+
// -----------------------------------------------------------------------
// AzureFilesystem Implementation
class AzureFileSystem::Impl {
public:
io::IOContext io_context_;
- bool is_hierarchical_namespace_enabled_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
AzureOptions options_;
explicit Impl(AzureOptions options, io::IOContext io_context)
: io_context_(io_context), options_(std::move(options)) {}
Status Init() {
- // TODO: GH-18014 Delete this once we have a proper implementation. This
just
- // initializes a pointless Azure blob service client with a fake endpoint
to ensure
- // the build will fail if the Azure SDK build is broken.
- auto default_credential =
std::make_shared<Azure::Identity::DefaultAzureCredential>();
- auto service_client = Azure::Storage::Blobs::BlobServiceClient(
- "http://fake-blob-storage-endpoint", default_credential);
- if (options_.backend == AzureBackend::Azurite) {
- // gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled
- // throws error in azurite
- is_hierarchical_namespace_enabled_ = false;
- }
+ service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ options_.account_blob_url, options_.storage_credentials_provider);
return Status::OK();
}
const AzureOptions& options() const { return options_; }
+
+ Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
+ AzureFileSystem* fs) {
+ ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
+ ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s));
+ RETURN_NOT_OK(ValidateFilePath(path));
+ auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
+ service_client_->GetBlobContainerClient(path.container)
+ .GetBlobClient(path.path_to_file));
+
+ auto ptr = std::make_shared<ObjectInputFile>(blob_client,
fs->io_context(), path);
+ RETURN_NOT_OK(ptr->Init());
+ return ptr;
+ }
+
+ Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
+ AzureFileSystem* fs) {
+ ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
+ if (info.type() == FileType::NotFound) {
+ return ::arrow::fs::internal::PathNotFound(info.path());
+ }
+ if (info.type() != FileType::File && info.type() != FileType::Unknown) {
+ return ::arrow::fs::internal::NotAFile(info.path());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path()));
+ RETURN_NOT_OK(ValidateFilePath(path));
+ auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
+ service_client_->GetBlobContainerClient(path.container)
+ .GetBlobClient(path.path_to_file));
+
+ auto ptr = std::make_shared<ObjectInputFile>(blob_client,
fs->io_context(), path,
+ info.size());
Review Comment:
```suggestion
auto ptr = std::make_shared<ObjectInputFile>(blob_client,
fs->io_context(), std::move(path),
info.size());
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -37,34 +43,331 @@ bool AzureOptions::Equals(const AzureOptions& other) const
{
credentials_kind == other.credentials_kind);
}
+Status AzureOptions::ConfigureAccountKeyCredentials(const std::string&
account_name,
+ const std::string&
account_key) {
+ if (this->backend == AzureBackend::Azurite) {
+ account_blob_url = "http://127.0.0.1:10000/" + account_name + "/";
+ account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/";
+ } else {
+ account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/";
+ account_blob_url = "https://" + account_name + ".blob.core.windows.net/";
+ }
+ storage_credentials_provider =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(account_name,
+
account_key);
+ credentials_kind = AzureCredentialsKind::StorageCredentials;
+ return Status::OK();
+}
+namespace {
+
+// An AzureFileSystem represents a single Azure storage account. AzurePath
describes a
+// container and path within that storage account.
+struct AzurePath {
+ std::string full_path;
+ std::string container;
+ std::string path_to_file;
+ std::vector<std::string> path_to_file_parts;
+
+ static Result<AzurePath> FromString(const std::string& s) {
+ // Example expected string format: testcontainer/testdir/testfile.txt
+ // container = testcontainer
+ // path_to_file = testdir/testfile.txt
+ // path_to_file_parts = [testdir, testfile.txt]
+ if (internal::IsLikelyUri(s)) {
+ return Status::Invalid(
+ "Expected an Azure object path of the form 'container/path...', got
a URI: '",
+ s, "'");
+ }
+ auto src = internal::RemoveTrailingSlash(s);
+ auto input_path = std::string(src.data());
+ src = internal::RemoveLeadingSlash(src);
+ auto first_sep = src.find_first_of(internal::kSep);
+ if (first_sep == 0) {
+ return Status::Invalid("Path cannot start with a separator ('",
input_path, "')");
+ }
+ if (first_sep == std::string::npos) {
+ return AzurePath{std::string(src), std::string(src), "", {}};
+ }
+ AzurePath path;
+ path.full_path = std::string(src);
+ path.container = std::string(src.substr(0, first_sep));
+ path.path_to_file = std::string(src.substr(first_sep + 1));
+ path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file);
+ RETURN_NOT_OK(Validate(path));
+ return path;
+ }
+
+ static Status Validate(const AzurePath& path) {
+ auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts);
+ if (!status.ok()) {
+ return Status::Invalid(status.message(), " in path ", path.full_path);
+ } else {
+ return status;
+ }
+ }
+
+ AzurePath parent() const {
+ DCHECK(has_parent());
+ auto parent = AzurePath{"", container, "", path_to_file_parts};
+ parent.path_to_file_parts.pop_back();
+ parent.path_to_file =
internal::JoinAbstractPath(parent.path_to_file_parts);
+ if (parent.path_to_file.empty()) {
+ parent.full_path = parent.container;
+ } else {
+ parent.full_path = parent.container + internal::kSep +
parent.path_to_file;
+ }
+ return parent;
+ }
+
+ bool has_parent() const { return !path_to_file.empty(); }
+
+ bool empty() const { return container.empty() && path_to_file.empty(); }
+
+ bool operator==(const AzurePath& other) const {
+ return container == other.container && path_to_file == other.path_to_file;
+ }
+};
+
+Status PathNotFound(const AzurePath& path) {
+ return ::arrow::fs::internal::PathNotFound(path.full_path);
+}
+
+Status NotAFile(const AzurePath& path) {
+ return ::arrow::fs::internal::NotAFile(path.full_path);
+}
+
+Status ValidateFilePath(const AzurePath& path) {
+ if (path.container.empty()) {
+ return PathNotFound(path);
+ }
+
+ if (path.path_to_file.empty()) {
+ return NotAFile(path);
+ }
+ return Status::OK();
+}
+
+Status ErrorToStatus(const std::string& prefix,
+ const Azure::Storage::StorageException& exception) {
+ return Status::IOError(prefix, " Azure Error: ", exception.what());
+}
+
+template <typename ObjectResult>
+std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult&
result) {
+ auto md = std::make_shared<KeyValueMetadata>();
+ for (auto prop : result) {
+ md->Append(prop.first, prop.second);
+ }
+ return md;
+}
+
+class ObjectInputFile final : public io::RandomAccessFile {
+ public:
+ ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>&
blob_client,
+ const io::IOContext& io_context, const AzurePath& path,
+ int64_t size = kNoSize)
+ : blob_client_(std::move(blob_client)),
+ io_context_(io_context),
+ path_(path),
Review Comment:
It's definitely part of the coding style here to avoid unnecessary copies
where we can, but existing code may still have some. The usual process is to
avoid introducing more and clean up what we happen to notice and touch.
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -37,34 +43,330 @@ bool AzureOptions::Equals(const AzureOptions& other) const
{
credentials_kind == other.credentials_kind);
}
+Status AzureOptions::ConfigureAccountKeyCredentials(const std::string&
account_name,
+ const std::string&
account_key) {
+ if (this->backend == AzureBackend::Azurite) {
+ account_blob_url = "http://127.0.0.1:10000/" + account_name + "/";
+ account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/";
+ } else {
+ account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/";
+ account_blob_url = "https://" + account_name + ".blob.core.windows.net/";
+ }
+ storage_credentials_provider =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(account_name,
+
account_key);
+ credentials_kind = AzureCredentialsKind::StorageCredentials;
+ return Status::OK();
+}
+namespace {
+
+// An AzureFileSystem represents a single Azure storage account. AzurePath
describes a
+// container and path within that storage account.
+struct AzurePath {
+ std::string full_path;
+ std::string container;
+ std::string path_to_file;
+ std::vector<std::string> path_to_file_parts;
+
+ static Result<AzurePath> FromString(const std::string& s) {
+ // Example expected string format: testcontainer/testdir/testfile.txt
+ // container = testcontainer
+ // path_to_file = testdir/testfile.txt
+ // path_to_file_parts = [testdir, testfile.txt]
+ if (internal::IsLikelyUri(s)) {
+ return Status::Invalid(
+ "Expected an Azure object path of the form 'container/path...', got
a URI: '",
+ s, "'");
+ }
+ auto src = internal::RemoveTrailingSlash(s);
+ auto input_path = std::string(src.data());
+ src = internal::RemoveLeadingSlash(src);
+ auto first_sep = src.find_first_of(internal::kSep);
+ if (first_sep == 0) {
+ return Status::Invalid("Path cannot start with a separator ('",
input_path, "')");
+ }
+ if (first_sep == std::string::npos) {
+ return AzurePath{std::string(src), std::string(src), "", {}};
+ }
+ AzurePath path;
+ path.full_path = std::string(src);
+ path.container = std::string(src.substr(0, first_sep));
+ path.path_to_file = std::string(src.substr(first_sep + 1));
+ path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file);
+ RETURN_NOT_OK(Validate(path));
+ return path;
+ }
+
+ static Status Validate(const AzurePath& path) {
+ auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts);
+ if (!status.ok()) {
+ return Status::Invalid(status.message(), " in path ", path.full_path);
+ } else {
+ return status;
+ }
+ }
+
+ AzurePath parent() const {
+ DCHECK(has_parent());
+ auto parent = AzurePath{"", container, "", path_to_file_parts};
+ parent.path_to_file_parts.pop_back();
+ parent.path_to_file =
internal::JoinAbstractPath(parent.path_to_file_parts);
+ if (parent.path_to_file.empty()) {
+ parent.full_path = parent.container;
+ } else {
+ parent.full_path = parent.container + internal::kSep +
parent.path_to_file;
+ }
+ return parent;
+ }
+
+ bool has_parent() const { return !path_to_file.empty(); }
+
+ bool empty() const { return container.empty() && path_to_file.empty(); }
+
+ bool operator==(const AzurePath& other) const {
+ return container == other.container && path_to_file == other.path_to_file;
+ }
+};
+
+Status PathNotFound(const AzurePath& path) {
+ return ::arrow::fs::internal::PathNotFound(path.full_path);
+}
+
+Status NotAFile(const AzurePath& path) {
+ return ::arrow::fs::internal::NotAFile(path.full_path);
+}
+
+Status ValidateFilePath(const AzurePath& path) {
+ if (path.container.empty()) {
+ return PathNotFound(path);
+ }
+
+ if (path.path_to_file.empty()) {
+ return NotAFile(path);
+ }
+ return Status::OK();
+}
+
+Status ErrorToStatus(const std::string& prefix,
+ const Azure::Storage::StorageException& exception) {
+ return Status::IOError(prefix, " Azure Error: ", exception.what());
+}
+
+template <typename ObjectResult>
+std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult&
result) {
+ auto md = std::make_shared<KeyValueMetadata>();
+ for (auto prop : result) {
+ md->Append(prop.first, prop.second);
+ }
+ return md;
+}
+
+class ObjectInputFile final : public io::RandomAccessFile {
+ public:
+ ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>
blob_client,
+ const io::IOContext& io_context, const AzurePath& path,
+ int64_t size = kNoSize)
+ : blob_client_(std::move(blob_client)),
+ io_context_(io_context),
+ path_(path),
+ content_length_(size) {}
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ return Status::OK();
+ }
+ try {
+ auto properties = blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ metadata_ = GetObjectMetadata(properties.Value.Metadata);
+ return Status::OK();
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
{
+ // Could be either container or blob not found.
+ return PathNotFound(path_);
+ }
+ return ErrorToStatus(
+ "When fetching properties for '" + blob_client_->GetUrl() + "': ",
exception);
+ }
+ }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed file.");
+ }
+ return Status::OK();
+ }
+
+ Status CheckPosition(int64_t position, const char* action) const {
+ DCHECK_GE(content_length_, 0);
+ if (position < 0) {
+ return Status::Invalid("Cannot ", action, " from negative position");
+ }
+ if (position > content_length_) {
+ return Status::IOError("Cannot ", action, " past end of file");
+ }
+ return Status::OK();
+ }
+
+ // RandomAccessFile APIs
+
+ Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+ return metadata_;
+ }
+
+ Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+ const io::IOContext& io_context) override {
+ return metadata_;
+ }
+
+ Status Close() override {
+ blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Result<int64_t> GetSize() override {
+ RETURN_NOT_OK(CheckClosed("size"));
+ return content_length_;
+ }
+
+ Status Seek(int64_t position) override {
+ RETURN_NOT_OK(CheckClosed("seek"));
+ RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+ pos_ = position;
+ return Status::OK();
+ }
+
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ RETURN_NOT_OK(CheckClosed("read"));
+ RETURN_NOT_OK(CheckPosition(position, "read"));
+
+ nbytes = std::min(nbytes, content_length_ - position);
+ if (nbytes == 0) {
+ return 0;
+ }
+
+ // Read the desired range of bytes
+ Azure::Core::Http::HttpRange range{.Offset = position, .Length = nbytes};
+ Azure::Storage::Blobs::DownloadBlobToOptions download_options{.Range =
range};
Review Comment:
Designated initializers are not standard in c++17; they are added in c++20.
```suggestion
Azure::Storage::Blobs::DownloadBlobToOptions download_options{{position,
nbytes}};
```
@pitrou @felipecrv since it isn't breaking any CI, would we add an exception
for this feature?
##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -154,6 +137,328 @@ TEST(AzureFileSystem, OptionsCompare) {
EXPECT_TRUE(options.Equals(options));
}
+class TestAzureFileSystem : public ::testing::Test {
+ public:
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
+ AzureOptions options_;
+ std::mt19937_64 generator_;
+ std::string container_name_;
+
+ void MakeFileSystem() {
+ const std::string& account_name = GetAzuriteEnv()->account_name();
+ const std::string& account_key = GetAzuriteEnv()->account_key();
+ options_.backend = AzureBackend::Azurite;
+ ASSERT_OK(options_.ConfigureAccountKeyCredentials(account_name,
account_key));
+ }
+
+ void SetUp() override {
+ ASSERT_THAT(GetAzuriteEnv(), NotNull());
+ ASSERT_OK(GetAzuriteEnv()->status());
+
+ MakeFileSystem();
+ generator_ = std::mt19937_64(std::random_device()());
+ container_name_ = RandomChars(32);
+ service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ options_.account_blob_url, options_.storage_credentials_provider);
+ ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options_));
+ auto container_client =
service_client_->GetBlobContainerClient(container_name_);
+ container_client.CreateIfNotExists();
+
+ auto blob_client =
container_client.GetBlockBlobClient(PreexistingObjectName());
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
+ strlen(kLoremIpsum));
+ }
+
+ void TearDown() override {
+ auto containers = service_client_->ListBlobContainers();
+ for (auto container : containers.BlobContainers) {
+ auto container_client =
service_client_->GetBlobContainerClient(container.Name);
+ container_client.DeleteIfExists();
+ }
+ }
+
+ std::string PreexistingContainerName() const { return container_name_; }
+
+ std::string PreexistingContainerPath() const {
+ return PreexistingContainerName() + '/';
+ }
+
+ static std::string PreexistingObjectName() { return "test-object-name"; }
+
+ std::string PreexistingObjectPath() const {
+ return PreexistingContainerPath() + PreexistingObjectName();
+ }
+
+ std::string NotFoundObjectPath() { return PreexistingContainerPath() +
"not-found"; }
+
+ std::string RandomLine(int lineno, std::size_t width) {
+ auto line = std::to_string(lineno) + ": ";
+ line += RandomChars(width - line.size() - 1);
+ line += '\n';
+ return line;
+ }
+
+ uint8_t RandomInteger() {
+ return std::uniform_int_distribution<std::uint8_t>()(generator_);
+ }
+
+ std::size_t RandomIndex(std::size_t end) {
+ return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
+ }
+
+ std::string RandomChars(std::size_t count) {
+ auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+ std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
+ std::string s;
+ std::generate_n(std::back_inserter(s), count, [&] { return
fillers[d(generator_)]; });
+ return s;
+ }
+
+ void UploadLines(std::vector<std::string> lines, const char* path_to_file,
+ int total_size) {
+ // TODO: Switch to using Azure filesystem to write once its implemented.
+ auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
+ std::string all_lines = std::accumulate(lines.begin(), lines.end(),
std::string(""));
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(all_lines.data()),
+ total_size);
+ }
+};
+
+TEST_F(TestAzureFileSystem, OpenInputStreamString) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+
+ EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamStringBuffers) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::string contents;
+ std::shared_ptr<Buffer> buffer;
+ do {
+ ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16));
+ contents.append(buffer->ToString());
+ } while (buffer && buffer->size() != 0);
+
+ EXPECT_EQ(contents, kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamInfo) {
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingObjectPath()));
+ arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File);
+
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(info));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+
+ EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamEmpty) {
+ const auto path_to_file = "empty-object.txt";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file)
+ .UploadFrom(nullptr, 0);
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(path));
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+ EXPECT_EQ(size, 0);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamNotFound) {
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(NotFoundObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamInfoInvalid) {
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingBucketPath()));
+ arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::Directory);
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(info));
+
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(NotFoundObjectPath()));
+ arrow::fs::FileInfo info2(PreexistingContainerPath(), FileType::NotFound);
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(info2));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamUri) {
+ ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfss://" +
PreexistingObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
+ const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";
+
+ service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlobClient(PreexistingObjectName())
+ .SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}});
Review Comment:
Please do!
##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -154,6 +137,330 @@ TEST(AzureFileSystem, OptionsCompare) {
EXPECT_TRUE(options.Equals(options));
}
+class TestAzureFileSystem : public ::testing::Test {
+ public:
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
+ std::mt19937_64 generator_;
+ std::string container_name_;
+
+ TestAzureFileSystem() : generator_(std::random_device()()) {}
+
+ AzureOptions MakeOptions() {
+ const std::string& account_name = GetAzuriteEnv()->account_name();
+ const std::string& account_key = GetAzuriteEnv()->account_key();
+ AzureOptions options;
+ options.backend = AzureBackend::Azurite;
+ ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name,
account_key));
+ return options;
+ }
+
+ void SetUp() override {
+ ASSERT_THAT(GetAzuriteEnv(), NotNull());
+ ASSERT_OK(GetAzuriteEnv()->status());
+
+ container_name_ = RandomChars(32);
+ auto options = MakeOptions();
+ service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ options.account_blob_url, options.storage_credentials_provider);
+ ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options));
+ auto container_client =
service_client_->GetBlobContainerClient(container_name_);
+ container_client.CreateIfNotExists();
+
+ auto blob_client =
container_client.GetBlockBlobClient(PreexistingObjectName());
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
+ strlen(kLoremIpsum));
+ }
+
+ void TearDown() override {
+ auto containers = service_client_->ListBlobContainers();
+ for (auto container : containers.BlobContainers) {
+ auto container_client =
service_client_->GetBlobContainerClient(container.Name);
+ container_client.DeleteIfExists();
+ }
+ }
+
+ std::string PreexistingContainerName() const { return container_name_; }
+
+ std::string PreexistingContainerPath() const {
+ return PreexistingContainerName() + '/';
+ }
+
+ static std::string PreexistingObjectName() { return "test-object-name"; }
+
+ std::string PreexistingObjectPath() const {
+ return PreexistingContainerPath() + PreexistingObjectName();
+ }
+
+ std::string NotFoundObjectPath() { return PreexistingContainerPath() +
"not-found"; }
+
+ std::string RandomLine(int lineno, std::size_t width) {
+ auto line = std::to_string(lineno) + ": ";
+ line += RandomChars(width - line.size() - 1);
+ line += '\n';
+ return line;
+ }
+
+ std::size_t RandomIndex(std::size_t end) {
+ return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
+ }
+
+ std::string RandomChars(std::size_t count) {
+ auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+ std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
+ std::string s;
+ std::generate_n(std::back_inserter(s), count, [&] { return
fillers[d(generator_)]; });
+ return s;
+ }
+
+ void UploadLines(const std::vector<std::string>& lines, const char*
path_to_file,
+ int total_size) {
+ // TODO: Switch to using Azure filesystem to write once its implemented.
+ auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
+ std::string all_lines = std::accumulate(lines.begin(), lines.end(),
std::string(""));
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(all_lines.data()),
+ total_size);
+ }
+};
+
+TEST_F(TestAzureFileSystem, OpenInputStreamString) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+
+ EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamStringBuffers) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::string contents;
+ std::shared_ptr<Buffer> buffer;
+ do {
+ ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16));
+ contents.append(buffer->ToString());
+ } while (buffer && buffer->size() != 0);
+
+ EXPECT_EQ(contents, kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamInfo) {
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingObjectPath()));
+ arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File);
+
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(info));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+
+ EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamEmpty) {
+ const auto path_to_file = "empty-object.txt";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file)
+ .UploadFrom(nullptr, 0);
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(path));
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+ EXPECT_EQ(size, 0);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamNotFound) {
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(NotFoundObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamInfoInvalid) {
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingBucketPath()));
+ arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::Directory);
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(info));
+
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(NotFoundObjectPath()));
+ arrow::fs::FileInfo info2(PreexistingContainerPath(), FileType::NotFound);
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(info2));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamUri) {
+ ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfss://" +
PreexistingObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) {
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/'));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
+ const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";
+
+ service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlobClient(PreexistingObjectName())
+ .SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}});
+
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::shared_ptr<const KeyValueMetadata> actual;
+ ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
+ ASSERT_OK_AND_EQ("value0", actual->Get("key0"));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamClosed) {
+ ASSERT_OK_AND_ASSIGN(auto stream,
fs_->OpenInputStream(PreexistingObjectPath()));
+ ASSERT_OK(stream->Close());
+ std::array<char, 16> buffer{};
+ ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data()));
+ ASSERT_RAISES(Invalid, stream->Read(buffer.size()));
+ ASSERT_RAISES(Invalid, stream->Tell());
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileMixedReadVsReadAt) {
+ // Create a file large enough to make the random access tests non-trivial.
+ auto constexpr kLineWidth = 100;
+ auto constexpr kLineCount = 4096;
+ std::vector<std::string> lines(kLineCount);
+ int lineno = 0;
+ std::generate_n(lines.begin(), lines.size(),
+ [&] { return RandomLine(++lineno, kLineWidth); });
+
+ const auto path_to_file = "OpenInputFileMixedReadVsReadAt/object-name";
+ const auto path = PreexistingContainerPath() + path_to_file;
+
+ UploadLines(lines, path_to_file, kLineCount * kLineWidth);
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path));
+ for (int i = 0; i != 32; ++i) {
+ SCOPED_TRACE("Iteration " + std::to_string(i));
+ // Verify sequential reads work as expected.
+ std::array<char, kLineWidth> buffer{};
+ std::int64_t size;
+ {
+ ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+ EXPECT_EQ(lines[2 * i], actual->ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
+ EXPECT_EQ(size, kLineWidth);
+ auto actual = std::string{buffer.begin(), buffer.end()};
+ EXPECT_EQ(lines[2 * i + 1], actual);
+ }
+
+ // Verify random reads interleave too.
+ auto const index = RandomIndex(kLineCount);
+ auto const position = index * kLineWidth;
+ ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(),
buffer.data()));
+ EXPECT_EQ(size, kLineWidth);
+ auto actual = std::string{buffer.begin(), buffer.end()};
+ EXPECT_EQ(lines[index], actual);
+
+ // Verify random reads using buffers work.
+ ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
+ EXPECT_EQ(lines[index], b->ToString());
+ }
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileRandomSeek) {
+ // Create a file large enough to make the random access tests non-trivial.
+ auto constexpr kLineWidth = 100;
+ auto constexpr kLineCount = 4096;
+ std::vector<std::string> lines(kLineCount);
+ int lineno = 0;
+ std::generate_n(lines.begin(), lines.size(),
+ [&] { return RandomLine(++lineno, kLineWidth); });
+
+ const auto path_to_file = "OpenInputFileRandomSeek/object-name";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ std::shared_ptr<io::OutputStream> output;
+
+ UploadLines(lines, path_to_file, kLineCount * kLineWidth);
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path));
+ for (int i = 0; i != 32; ++i) {
+ SCOPED_TRACE("Iteration " + std::to_string(i));
+ // Verify sequential reads work as expected.
+ auto const index = RandomIndex(kLineCount);
+ auto const position = index * kLineWidth;
+ ASSERT_OK(file->Seek(position));
+ ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+ EXPECT_EQ(lines[index], actual->ToString());
+ }
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileIoContext) {
+ // Create a test file.
+ const auto path_to_file = "OpenInputFileIoContext/object-name";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ const std::string contents = "The quick brown fox jumps over the lazy dog";
+
+ auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(contents.data()),
+ contents.length());
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path));
+ EXPECT_EQ(fs_->io_context().external_id(), file->io_context().external_id());
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileInfo) {
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingObjectPath()));
+ arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File);
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(info));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ auto constexpr kStart = 16;
+ ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(),
buffer.data()));
+
+ auto const expected = std::string(kLoremIpsum).substr(kStart);
+ EXPECT_EQ(std::string(buffer.data(), size), expected);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileNotFound) {
+ ASSERT_RAISES(IOError, fs_->OpenInputFile(NotFoundObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileInfoInvalid) {
+ // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingContainerPath()));
Review Comment:
Not necessary in the scope of this PR but FWIW this should be as simple as a
call to BlobClient::GetProperties right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]