This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git
The following commit(s) were added to refs/heads/main by this push:
new a2547ea Add a basic dataset reading example (#85)
a2547ea is described below
commit a2547eae0e51343292baaab75bedc76e307c3650
Author: Weston Pace <[email protected]>
AuthorDate: Wed Oct 27 10:46:20 2021 -1000
Add a basic dataset reading example (#85)
* Added a basic dataset reading example
* Remove cmake debugging
* Adding newline to end of common.cc
* Apply suggestions from code review
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
---
Makefile | 2 +-
cpp/code/CMakeLists.txt | 9 ++-
cpp/code/common.cc | 17 ++++-
cpp/code/common.h | 1 +
cpp/code/datasets.cc | 159 ++++++++++++++++++++++++++++++++++++++++++++
cpp/source/basic.rst | 1 +
cpp/source/create.rst | 1 +
cpp/source/datasets.rst | 67 +++++++++++++++++++
cpp/source/index.rst | 2 +
testdata/airquality.parquet | Bin 0 -> 4799 bytes
10 files changed, 255 insertions(+), 4 deletions(-)
diff --git a/Makefile b/Makefile
index 2acc8df..69f0e47 100644
--- a/Makefile
+++ b/Makefile
@@ -58,7 +58,7 @@ cpptest:
@echo ">>> Running C++ Tests/Snippets <<<\n"
rm -rf cpp/recipe-test-build
mkdir cpp/recipe-test-build
- cd cpp/recipe-test-build && cmake ../code -DCMAKE_BUILD_TYPE=Debug &&
cmake --build . && ctest -j 1
+ cd cpp/recipe-test-build && cmake ../code -DCMAKE_BUILD_TYPE=Debug &&
cmake --build . && ctest --output-on-failure -j 1
mkdir -p cpp/build
cp cpp/recipe-test-build/recipes_out.arrow cpp/build
diff --git a/cpp/code/CMakeLists.txt b/cpp/code/CMakeLists.txt
index e24a245..a57b6eb 100644
--- a/cpp/code/CMakeLists.txt
+++ b/cpp/code/CMakeLists.txt
@@ -18,9 +18,11 @@ set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(googletest)
# Add Arrow
-find_package(Arrow REQUIRED)
+find_package(Arrow REQUIRED COMPONENTS dataset parquet)
-set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
+if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+ set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
+endif()
# Create test targets
enable_testing()
@@ -37,6 +39,8 @@ function(RECIPE TARGET)
target_link_libraries(
${TARGET}
arrow_shared
+ arrow_dataset
+ parquet
gtest
)
if (MSVC)
@@ -50,3 +54,4 @@ endfunction()
recipe(basic_arrow)
recipe(creating_arrow_objects)
+recipe(datasets)
diff --git a/cpp/code/common.cc b/cpp/code/common.cc
index 43f27c0..4a79907 100644
--- a/cpp/code/common.cc
+++ b/cpp/code/common.cc
@@ -17,6 +17,7 @@
#include "common.h"
+#include <filesystem>
#include <sstream>
#include <unordered_map>
@@ -159,4 +160,18 @@ arrow::Status DumpRecipeOutput(const std::string&
output_filename) {
arrow::ipc::MakeStreamWriter(out_stream.get(), merged_table->schema()));
RETURN_NOT_OK(writer->WriteTable(*merged_table));
return writer->Close();
-}
\ No newline at end of file
+}
+
+arrow::Result<std::string> FindTestDataFile(const std::string& test_data_name)
{
+ auto path_iter = std::filesystem::current_path();
+ while (path_iter.has_parent_path()) {
+ auto test_data_dir_path = path_iter / "testdata";
+ if (std::filesystem::exists(test_data_dir_path)) {
+ return (test_data_dir_path / test_data_name).string();
+ }
+ path_iter = path_iter.parent_path();
+ }
+ return arrow::Status::Invalid(
+ "Could not locate testdata directory. Tests must be "
+ "run inside of the cookbook repo");
+}
diff --git a/cpp/code/common.h b/cpp/code/common.h
index e9f5d9e..26d074b 100644
--- a/cpp/code/common.h
+++ b/cpp/code/common.h
@@ -49,5 +49,6 @@ void StartRecipe(const std::string& recipe_name);
void EndRecipe(const std::string& recipe_name);
arrow::Status DumpRecipeOutput(const std::string& output_filename);
bool HasRecipeOutput();
+arrow::Result<std::string> FindTestDataFile(const std::string& test_data_name);
#endif // ARROW_COOKBOOK_COMMON_H
diff --git a/cpp/code/datasets.cc b/cpp/code/datasets.cc
new file mode 100644
index 0000000..cc067cc
--- /dev/null
+++ b/cpp/code/datasets.cc
@@ -0,0 +1,159 @@
+#include <arrow/api.h>
+#include <arrow/dataset/api.h>
+#include <arrow/filesystem/api.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+
+#include <filesystem>
+#include <memory>
+
+#include "common.h"
+
+class DatasetReadingTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ airquality_partitioned_dir_ =
+ std::filesystem::temp_directory_path() / "cookbook_cpp_airquality";
+ std::shared_ptr<arrow::fs::FileSystem> fs =
+ std::make_shared<arrow::fs::LocalFileSystem>();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Table> airquality,
+ ReadInAirQuality(fs.get()));
+ WritePartitionedAirQuality(airquality, std::move(fs));
+ }
+
+ const std::string& airquality_basedir() { return
airquality_partitioned_dir_; }
+
+ private:
+ void WritePartitionedAirQuality(const std::shared_ptr<arrow::Table>&
airquality,
+ std::shared_ptr<arrow::fs::FileSystem> fs) {
+ std::shared_ptr<arrow::RecordBatchReader> table_reader =
+ std::make_shared<arrow::TableBatchReader>(*airquality);
+
+ std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+
arrow::dataset::ScannerBuilder::FromRecordBatchReader(std::move(table_reader));
+ ASSERT_OK(scanner_builder->UseThreads(true));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner,
+ scanner_builder->Finish());
+
+ std::shared_ptr<arrow::Schema> partitioning_schema = arrow::schema(
+ {arrow::field("Month", arrow::int32()), arrow::field("Day",
arrow::int32())});
+ std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
+ arrow::dataset::HivePartitioning::MakeFactory();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Partitioning>
partitioning,
+ partitioning_factory->Finish(partitioning_schema));
+
+ std::shared_ptr<arrow::dataset::ParquetFileFormat> parquet_format =
+ std::make_shared<arrow::dataset::ParquetFileFormat>();
+
+ arrow::dataset::FileSystemDatasetWriteOptions write_options;
+ write_options.filesystem = std::move(fs);
+ write_options.partitioning = std::move(partitioning);
+ write_options.base_dir = airquality_partitioned_dir_;
+ write_options.basename_template = "chunk-{i}.parquet";
+ write_options.file_write_options = parquet_format->DefaultWriteOptions();
+
+ ASSERT_OK(
+ arrow::dataset::FileSystemDataset::Write(write_options,
std::move(scanner)));
+ }
+
+ static arrow::Result<std::shared_ptr<arrow::Table>> ReadInAirQuality(
+ arrow::fs::FileSystem* fs) {
+ ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
+ FindTestDataFile("airquality.parquet"));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> file,
+ fs->OpenInputFile(airquality_path));
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::Open(file);
+ std::unique_ptr<parquet::arrow::FileReader> reader;
+ ARROW_RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+ arrow::default_memory_pool(), std::move(parquet_reader), &reader));
+ std::shared_ptr<arrow::Table> table;
+ ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
+ return table;
+ }
+
+ std::string airquality_partitioned_dir_;
+};
+
+TEST_F(DatasetReadingTest, DatasetRead) {
+ StartRecipe("ListPartitionedDataset");
+ const std::string& directory_base = airquality_basedir();
+
+ // Create a filesystem
+ std::shared_ptr<arrow::fs::LocalFileSystem> fs =
+ std::make_shared<arrow::fs::LocalFileSystem>();
+
+ // Create a file selector which describes which files are part of
+ // the dataset. This selector performs a recursive search of a base
+ // directory which is typical with partitioned datasets. You can also
+ // create a dataset from a list of one or more paths.
+ arrow::fs::FileSelector selector;
+ selector.base_dir = directory_base;
+ selector.recursive = true;
+
+ // List out the files so we can see how our data is partitioned.
+ // This step is not necessary for reading a dataset
+ ASSERT_OK_AND_ASSIGN(std::vector<arrow::fs::FileInfo> file_infos,
+ fs->GetFileInfo(selector));
+ int num_printed = 0;
+ for (const auto& path : file_infos) {
+ if (path.IsFile()) {
+ rout << path.path().substr(directory_base.size()) << std::endl;
+ if (++num_printed == 10) {
+ rout << "..." << std::endl;
+ break;
+ }
+ }
+ }
+
+ EndRecipe("ListPartitionedDataset");
+ StartRecipe("CreatingADataset");
+ // Create a file format which describes the format of the files.
+ // Here we specify we are reading parquet files. We could pick a different
format
+ // such as Arrow-IPC files or CSV files or we could customize the parquet
format with
+ // additional reading & parsing options.
+ std::shared_ptr<arrow::dataset::ParquetFileFormat> format =
+ std::make_shared<arrow::dataset::ParquetFileFormat>();
+
+ // Create a partitioning factory. A partitioning factory will be used by a
dataset
+ // factory to infer the partitioning schema from the filenames. All we need
to specify
+ // is the flavor of partitioning which, in our case, is "hive".
+ //
+ // Alternatively, we could manually create a partitioning scheme from a
schema. This is
+ // typically not necessary for hive partitioning as inference works well.
+ std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
+ arrow::dataset::HivePartitioning::MakeFactory();
+
+ arrow::dataset::FileSystemFactoryOptions options;
+ options.partitioning = partitioning_factory;
+
+ // Create a dataset factory
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::dataset::DatasetFactory> dataset_factory,
+ arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format,
options));
+
+ // Create the dataset, this will scan the dataset directory to find all the
files
+ // and may scan some file metadata in order to determine the dataset schema.
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Dataset> dataset,
+ dataset_factory->Finish());
+
+ rout << "We discovered the following schema for the dataset:" << std::endl
+ << std::endl
+ << dataset->schema()->ToString() << std::endl;
+ EndRecipe("CreatingADataset");
+ StartRecipe("ScanningADataset");
+
+ // Create a scanner
+ arrow::dataset::ScannerBuilder scanner_builder(dataset);
+ ASSERT_OK(scanner_builder.UseAsync(true));
+ ASSERT_OK(scanner_builder.UseThreads(true));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner,
+ scanner_builder.Finish());
+
+ // Scan the dataset. There are a variety of other methods available on the
scanner as
+ // well
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Table> table,
scanner->ToTable());
+ rout << "Read in a table with " << table->num_rows() << " rows and "
+ << table->num_columns() << " columns";
+ EndRecipe("ScanningADataset");
+}
diff --git a/cpp/source/basic.rst b/cpp/source/basic.rst
index 913781a..b91cc0e 100644
--- a/cpp/source/basic.rst
+++ b/cpp/source/basic.rst
@@ -14,6 +14,7 @@
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
+
===================================
Working with the C++ Implementation
===================================
diff --git a/cpp/source/create.rst b/cpp/source/create.rst
index 5e5483c..393747b 100644
--- a/cpp/source/create.rst
+++ b/cpp/source/create.rst
@@ -14,6 +14,7 @@
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
+
======================
Creating Arrow Objects
======================
diff --git a/cpp/source/datasets.rst b/cpp/source/datasets.rst
new file mode 100644
index 0000000..f434da5
--- /dev/null
+++ b/cpp/source/datasets.rst
@@ -0,0 +1,67 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============================
+Reading and Writing Datasets
+============================
+
+This section contains a number of recipes for reading and writing
+datasets. Datasets are a collection of one or more files containing
+tabular data.
+
+.. contents::
+
+Read a Partitioned Dataset
+==========================
+
+The individual data files that make up a dataset will often be
+distributed across several different directories according to some
+kind of partitioning scheme.
+
+This simplifies management of the data and also allows for partial
+reads of the dataset by inspecting the file paths and utilizing the
+guarantees provided by the partitioning scheme.
+
+This recipe demonstrates the basics of reading a partitioned dataset.
+First let us inspect our data:
+
+.. recipe:: ../code/datasets.cc ListPartitionedDataset
+ :caption: A listing of files in our dataset
+ :dedent: 2
+
+.. note::
+
+ This partitioning scheme of key=value is referred to as "hive"
+ partitioning within Arrow.
+
+Now that we have a filesystem and a selector we can go ahead and create
+a dataset. To do this we need to pick a format and a partitioning
+scheme. Once we have all of the pieces we need we can create an
+arrow::dataset::Dataset instance.
+
+.. recipe:: ../code/datasets.cc CreatingADataset
+ :caption: Creating an arrow::dataset::Dataset instance
+ :dedent: 2
+
+Once we have a dataset object we can read in the data. Reading the data
+from a dataset is sometimes called "scanning" the dataset and the object
+we use to do this is an arrow::dataset::Scanner. The following snippet
+shows how to scan the entire dataset into an in-memory table:
+
+.. recipe:: ../code/datasets.cc ScanningADataset
+ :caption: Scanning a dataset into an arrow::Table
+ :dedent: 2
diff --git a/cpp/source/index.rst b/cpp/source/index.rst
index 023f6c8..dc075ba 100644
--- a/cpp/source/index.rst
+++ b/cpp/source/index.rst
@@ -14,6 +14,7 @@
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.
+
Apache Arrow C++ Cookbook
=========================
@@ -28,6 +29,7 @@ serve as robust and well performing solutions to those tasks.
basic
create
+ datasets
Indices and tables
==================
diff --git a/testdata/airquality.parquet b/testdata/airquality.parquet
new file mode 100644
index 0000000..4b24134
Binary files /dev/null and b/testdata/airquality.parquet differ