This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new a2547ea  Add a basic dataset reading example (#85)
a2547ea is described below

commit a2547eae0e51343292baaab75bedc76e307c3650
Author: Weston Pace <[email protected]>
AuthorDate: Wed Oct 27 10:46:20 2021 -1000

    Add a basic dataset reading example (#85)
    
    * Added a basic dataset reading example
    
    * Remove cmake debugging
    
    * Adding newline to end of common.cc
    
    * Apply suggestions from code review
    
    Co-authored-by: Nic Crane <[email protected]>
    
    Co-authored-by: Nic Crane <[email protected]>
---
 Makefile                    |   2 +-
 cpp/code/CMakeLists.txt     |   9 ++-
 cpp/code/common.cc          |  17 ++++-
 cpp/code/common.h           |   1 +
 cpp/code/datasets.cc        | 159 ++++++++++++++++++++++++++++++++++++++++++++
 cpp/source/basic.rst        |   1 +
 cpp/source/create.rst       |   1 +
 cpp/source/datasets.rst     |  67 +++++++++++++++++++
 cpp/source/index.rst        |   2 +
 testdata/airquality.parquet | Bin 0 -> 4799 bytes
 10 files changed, 255 insertions(+), 4 deletions(-)

diff --git a/Makefile b/Makefile
index 2acc8df..69f0e47 100644
--- a/Makefile
+++ b/Makefile
@@ -58,7 +58,7 @@ cpptest:
        @echo ">>> Running C++ Tests/Snippets <<<\n"
        rm -rf cpp/recipe-test-build
        mkdir cpp/recipe-test-build
-       cd cpp/recipe-test-build && cmake ../code -DCMAKE_BUILD_TYPE=Debug && 
cmake --build . && ctest -j 1
+       cd cpp/recipe-test-build && cmake ../code -DCMAKE_BUILD_TYPE=Debug && 
cmake --build . && ctest --output-on-failure -j 1
        mkdir -p cpp/build
        cp cpp/recipe-test-build/recipes_out.arrow cpp/build
 
diff --git a/cpp/code/CMakeLists.txt b/cpp/code/CMakeLists.txt
index e24a245..a57b6eb 100644
--- a/cpp/code/CMakeLists.txt
+++ b/cpp/code/CMakeLists.txt
@@ -18,9 +18,11 @@ set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
 FetchContent_MakeAvailable(googletest)
 
 # Add Arrow
-find_package(Arrow REQUIRED)
+find_package(Arrow REQUIRED COMPONENTS dataset parquet)
 
-set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
+if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+    set(CMAKE_CXX_CLANG_TIDY "clang-tidy")
+endif()
 
 # Create test targets
 enable_testing()
@@ -37,6 +39,8 @@ function(RECIPE TARGET)
     target_link_libraries(
             ${TARGET}
             arrow_shared
+            arrow_dataset
+            parquet
             gtest
     )
     if (MSVC)
@@ -50,3 +54,4 @@ endfunction()
 
 recipe(basic_arrow)
 recipe(creating_arrow_objects)
+recipe(datasets)
diff --git a/cpp/code/common.cc b/cpp/code/common.cc
index 43f27c0..4a79907 100644
--- a/cpp/code/common.cc
+++ b/cpp/code/common.cc
@@ -17,6 +17,7 @@
 
 #include "common.h"
 
+#include <filesystem>
 #include <sstream>
 #include <unordered_map>
 
@@ -159,4 +160,18 @@ arrow::Status DumpRecipeOutput(const std::string& 
output_filename) {
       arrow::ipc::MakeStreamWriter(out_stream.get(), merged_table->schema()));
   RETURN_NOT_OK(writer->WriteTable(*merged_table));
   return writer->Close();
-}
\ No newline at end of file
+}
+
+arrow::Result<std::string> FindTestDataFile(const std::string& test_data_name) 
{
+  auto path_iter = std::filesystem::current_path();
+  while (path_iter.has_parent_path()) {
+    auto test_data_dir_path = path_iter / "testdata";
+    if (std::filesystem::exists(test_data_dir_path)) {
+      return (test_data_dir_path / test_data_name).string();
+    }
+    path_iter = path_iter.parent_path();
+  }
+  return arrow::Status::Invalid(
+      "Could not locate testdata directory.  Tests must be "
+      "run inside of the cookbook repo");
+}
diff --git a/cpp/code/common.h b/cpp/code/common.h
index e9f5d9e..26d074b 100644
--- a/cpp/code/common.h
+++ b/cpp/code/common.h
@@ -49,5 +49,6 @@ void StartRecipe(const std::string& recipe_name);
 void EndRecipe(const std::string& recipe_name);
 arrow::Status DumpRecipeOutput(const std::string& output_filename);
 bool HasRecipeOutput();
+arrow::Result<std::string> FindTestDataFile(const std::string& test_data_name);
 
 #endif  // ARROW_COOKBOOK_COMMON_H
diff --git a/cpp/code/datasets.cc b/cpp/code/datasets.cc
new file mode 100644
index 0000000..cc067cc
--- /dev/null
+++ b/cpp/code/datasets.cc
@@ -0,0 +1,159 @@
+#include <arrow/api.h>
+#include <arrow/dataset/api.h>
+#include <arrow/filesystem/api.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/reader.h>
+
+#include <filesystem>
+#include <memory>
+
+#include "common.h"
+
+class DatasetReadingTest : public ::testing::Test {
+ public:
+  void SetUp() override {
+    airquality_partitioned_dir_ =
+        std::filesystem::temp_directory_path() / "cookbook_cpp_airquality";
+    std::shared_ptr<arrow::fs::FileSystem> fs =
+        std::make_shared<arrow::fs::LocalFileSystem>();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Table> airquality,
+                         ReadInAirQuality(fs.get()));
+    WritePartitionedAirQuality(airquality, std::move(fs));
+  }
+
+  const std::string& airquality_basedir() { return 
airquality_partitioned_dir_; }
+
+ private:
+  void WritePartitionedAirQuality(const std::shared_ptr<arrow::Table>& 
airquality,
+                                  std::shared_ptr<arrow::fs::FileSystem> fs) {
+    std::shared_ptr<arrow::RecordBatchReader> table_reader =
+        std::make_shared<arrow::TableBatchReader>(*airquality);
+
+    std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
+        
arrow::dataset::ScannerBuilder::FromRecordBatchReader(std::move(table_reader));
+    ASSERT_OK(scanner_builder->UseThreads(true));
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner,
+                         scanner_builder->Finish());
+
+    std::shared_ptr<arrow::Schema> partitioning_schema = arrow::schema(
+        {arrow::field("Month", arrow::int32()), arrow::field("Day", 
arrow::int32())});
+    std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
+        arrow::dataset::HivePartitioning::MakeFactory();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Partitioning> 
partitioning,
+                         partitioning_factory->Finish(partitioning_schema));
+
+    std::shared_ptr<arrow::dataset::ParquetFileFormat> parquet_format =
+        std::make_shared<arrow::dataset::ParquetFileFormat>();
+
+    arrow::dataset::FileSystemDatasetWriteOptions write_options;
+    write_options.filesystem = std::move(fs);
+    write_options.partitioning = std::move(partitioning);
+    write_options.base_dir = airquality_partitioned_dir_;
+    write_options.basename_template = "chunk-{i}.parquet";
+    write_options.file_write_options = parquet_format->DefaultWriteOptions();
+
+    ASSERT_OK(
+        arrow::dataset::FileSystemDataset::Write(write_options, 
std::move(scanner)));
+  }
+
+  static arrow::Result<std::shared_ptr<arrow::Table>> ReadInAirQuality(
+      arrow::fs::FileSystem* fs) {
+    ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
+                          FindTestDataFile("airquality.parquet"));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> file,
+                          fs->OpenInputFile(airquality_path));
+    std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+        parquet::ParquetFileReader::Open(file);
+    std::unique_ptr<parquet::arrow::FileReader> reader;
+    ARROW_RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+        arrow::default_memory_pool(), std::move(parquet_reader), &reader));
+    std::shared_ptr<arrow::Table> table;
+    ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
+    return table;
+  }
+
+  std::string airquality_partitioned_dir_;
+};
+
+TEST_F(DatasetReadingTest, DatasetRead) {
+  StartRecipe("ListPartitionedDataset");
+  const std::string& directory_base = airquality_basedir();
+
+  // Create a filesystem
+  std::shared_ptr<arrow::fs::LocalFileSystem> fs =
+      std::make_shared<arrow::fs::LocalFileSystem>();
+
+  // Create a file selector which describes which files are part of
+  // the dataset.  This selector performs a recursive search of a base
+  // directory which is typical with partitioned datasets.  You can also
+  // create a dataset from a list of one or more paths.
+  arrow::fs::FileSelector selector;
+  selector.base_dir = directory_base;
+  selector.recursive = true;
+
+  // List out the files so we can see how our data is partitioned.
+  // This step is not necessary for reading a dataset
+  ASSERT_OK_AND_ASSIGN(std::vector<arrow::fs::FileInfo> file_infos,
+                       fs->GetFileInfo(selector));
+  int num_printed = 0;
+  for (const auto& path : file_infos) {
+    if (path.IsFile()) {
+      rout << path.path().substr(directory_base.size()) << std::endl;
+      if (++num_printed == 10) {
+        rout << "..." << std::endl;
+        break;
+      }
+    }
+  }
+
+  EndRecipe("ListPartitionedDataset");
+  StartRecipe("CreatingADataset");
+  // Create a file format which describes the format of the files.
+  // Here we specify we are reading parquet files.  We could pick a different 
format
+  // such as Arrow-IPC files or CSV files or we could customize the parquet 
format with
+  // additional reading & parsing options.
+  std::shared_ptr<arrow::dataset::ParquetFileFormat> format =
+      std::make_shared<arrow::dataset::ParquetFileFormat>();
+
+  // Create a partitioning factory.  A partitioning factory will be used by a 
dataset
+  // factory to infer the partitioning schema from the filenames.  All we need 
to specify
+  // is the flavor of partitioning which, in our case, is "hive".
+  //
+  // Alternatively, we could manually create a partitioning scheme from a 
schema.  This is
+  // typically not necessary for hive partitioning as inference works well.
+  std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
+      arrow::dataset::HivePartitioning::MakeFactory();
+
+  arrow::dataset::FileSystemFactoryOptions options;
+  options.partitioning = partitioning_factory;
+
+  // Create a dataset factory
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<arrow::dataset::DatasetFactory> dataset_factory,
+      arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, 
options));
+
+  // Create the dataset, this will scan the dataset directory to find all the 
files
+  // and may scan some file metadata in order to determine the dataset schema.
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Dataset> dataset,
+                       dataset_factory->Finish());
+
+  rout << "We discovered the following schema for the dataset:" << std::endl
+       << std::endl
+       << dataset->schema()->ToString() << std::endl;
+  EndRecipe("CreatingADataset");
+  StartRecipe("ScanningADataset");
+
+  // Create a scanner
+  arrow::dataset::ScannerBuilder scanner_builder(dataset);
+  ASSERT_OK(scanner_builder.UseAsync(true));
+  ASSERT_OK(scanner_builder.UseThreads(true));
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner,
+                       scanner_builder.Finish());
+
+  // Scan the dataset.  There are a variety of other methods available on the 
scanner as
+  // well
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Table> table, 
scanner->ToTable());
+  rout << "Read in a table with " << table->num_rows() << " rows and "
+       << table->num_columns() << " columns";
+  EndRecipe("ScanningADataset");
+}
diff --git a/cpp/source/basic.rst b/cpp/source/basic.rst
index 913781a..b91cc0e 100644
--- a/cpp/source/basic.rst
+++ b/cpp/source/basic.rst
@@ -14,6 +14,7 @@
 .. KIND, either express or implied.  See the License for the
 .. specific language governing permissions and limitations
 .. under the License.
+
 ===================================
 Working with the C++ Implementation
 ===================================
diff --git a/cpp/source/create.rst b/cpp/source/create.rst
index 5e5483c..393747b 100644
--- a/cpp/source/create.rst
+++ b/cpp/source/create.rst
@@ -14,6 +14,7 @@
 .. KIND, either express or implied.  See the License for the
 .. specific language governing permissions and limitations
 .. under the License.
+
 ======================
 Creating Arrow Objects
 ======================
diff --git a/cpp/source/datasets.rst b/cpp/source/datasets.rst
new file mode 100644
index 0000000..f434da5
--- /dev/null
+++ b/cpp/source/datasets.rst
@@ -0,0 +1,67 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+============================
+Reading and Writing Datasets
+============================
+
+This section contains a number of recipes for reading and writing
+datasets.  Datasets are a collection of one or more files containing
+tabular data.
+
+.. contents::
+
+Read a Partitioned Dataset
+==========================
+
+The individual data files that make up a dataset will often be
+distributed across several different directories according to some
+kind of partitioning scheme.
+
+This simplifies management of the data and also allows for partial
+reads of the dataset by inspecting the file paths and utilizing the
+guarantees provided by the partitioning scheme.
+
+This recipe demonstrates the basics of reading a partitioned dataset.
+First let us inspect our data:
+
+.. recipe:: ../code/datasets.cc ListPartitionedDataset
+  :caption: A listing of files in our dataset
+  :dedent: 2
+
+.. note::
+
+    This partitioning scheme of key=value is referred to as "hive"
+    partitioning within Arrow.
+
+Now that we have a filesystem and a selector we can go ahead and create
+a dataset.  To do this we need to pick a format and a partitioning
+scheme.  Once we have all of the pieces we need we can create an 
+arrow::dataset::Dataset instance.
+
+.. recipe:: ../code/datasets.cc CreatingADataset
+  :caption: Creating an arrow::dataset::Dataset instance
+  :dedent: 2
+
+Once we have a dataset object we can read in the data.  Reading the data
+from a dataset is sometimes called "scanning" the dataset and the object
+we use to do this is an arrow::dataset::Scanner.  The following snippet
+shows how to scan the entire dataset into an in-memory table:
+
+.. recipe:: ../code/datasets.cc ScanningADataset
+  :caption: Scanning a dataset into an arrow::Table
+  :dedent: 2
diff --git a/cpp/source/index.rst b/cpp/source/index.rst
index 023f6c8..dc075ba 100644
--- a/cpp/source/index.rst
+++ b/cpp/source/index.rst
@@ -14,6 +14,7 @@
 .. KIND, either express or implied.  See the License for the
 .. specific language governing permissions and limitations
 .. under the License.
+
 Apache Arrow C++ Cookbook
 =========================
 
@@ -28,6 +29,7 @@ serve as robust and well performing solutions to those tasks.
 
    basic
    create
+   datasets
 
 Indices and tables
 ==================
diff --git a/testdata/airquality.parquet b/testdata/airquality.parquet
new file mode 100644
index 0000000..4b24134
Binary files /dev/null and b/testdata/airquality.parquet differ

Reply via email to