westonpace commented on code in PR #14264:
URL: https://github.com/apache/arrow/pull/14264#discussion_r993993280


##########
cpp/src/arrow/dataset/scanner_test.cc:
##########
@@ -822,6 +825,25 @@ TEST(TestNewScanner, MissingColumn) {
   AssertArraysEqual(*expected_nulls, *batches[0]->column(1));
 }
 
+void WriteIpcData(const std::string& path,
+                  const std::shared_ptr<fs::FileSystem> file_system,
+                  const std::shared_ptr<Table> input) {
+  EXPECT_OK_AND_ASSIGN(auto mmap, file_system->OpenOutputStream(path));
+  ASSERT_OK_AND_ASSIGN(
+      auto file_writer,
+      MakeFileWriter(mmap, input->schema(), ipc::IpcWriteOptions::Defaults()));

Review Comment:
   ```suggestion
     EXPECT_OK_AND_ASSIGN(auto out_stream, file_system->OpenOutputStream(path));
     ASSERT_OK_AND_ASSIGN(
         auto file_writer,
         MakeFileWriter(out_stream, input->schema(), 
ipc::IpcWriteOptions::Defaults()));
   ```



##########
cpp/src/arrow/dataset/scanner.cc:
##########
@@ -115,6 +115,35 @@ const FieldVector kAugmentedFields{
     field("__filename", utf8()),
 };
 
+Result<FieldVector> GetProjectedFields(const std::shared_ptr<ScanOptions>& 
scan_options,

Review Comment:
   Minor nit: I think it probably is more readable if this is named 
`GetProjectedSchemaFromExpression` and takes in `const compute::Expression& 
projection` instead of `const std::shared_ptr<ScanOptions>& scan_options` and 
returns `Result<std::shared_ptr<Schema>>`



##########
cpp/src/arrow/dataset/scanner_test.cc:
##########
@@ -2758,5 +2789,77 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) {
   AssertTablesEqual(*expected, *sorted.table(), /*same_chunk_layout=*/false);
 }
 
+TEST(ScanNode, OnlyLoadProjectedFields) {
+  compute::ExecContext exec_context;
+  arrow::dataset::internal::Initialize();
+  ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make());
+
+  auto dummy_schema = schema(
+      {field("key", int64()), field("shared", int64()), field("distinct", 
int64())});
+
+  // creating a dummy dataset using a dummy table
+  auto table = TableFromJSON(dummy_schema, {R"([
+      [1, 1, 10],
+      [3, 4, 20]
+    ])"});
+
+  auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+  const std::string file_name = "plan_scan_disk_test.arrow";
+
+  ASSERT_OK_AND_ASSIGN(auto tempdir,
+                       
arrow::internal::TemporaryDir::Make("plan-test-tempdir-"));
+  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name));
+  std::string file_path_str = file_path.ToString();
+
+  WriteIpcData(file_path_str, filesystem, table);
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {file_path_str};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            filesystem, std::move(files), 
format, {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto scan_options = std::make_shared<dataset::ScanOptions>();
+  compute::Expression extract_expr = compute::field_ref("shared");
+  // don't use a function.
+  scan_options->projection =
+      call("make_struct", {extract_expr}, 
compute::MakeStructOptions{{"shared"}});
+
+  arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen;
+
+  auto declarations = compute::Declaration::Sequence(
+      {compute::Declaration({"scan", dataset::ScanNodeOptions{dataset, 
scan_options}}),
+       compute::Declaration({"sink", compute::SinkNodeOptions{&sink_gen}})});
+
+  ASSERT_OK_AND_ASSIGN(auto decl, declarations.AddToPlan(plan.get()));
+
+  ASSERT_OK(decl->Validate());
+
+  auto output_schema =
+      schema({field("a", int64()), field("b", int64()), field("c", int64())});
+
+  auto expected = TableFromJSON(dummy_schema, {R"([
+      [null, 1, null],
+      [null, 4, null]
+  ])"});
+
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
compute::MakeGeneratorReader(
+      output_schema, std::move(sink_gen), exec_context.memory_pool());
+
+  ASSERT_OK(plan->Validate());
+  ASSERT_OK(plan->StartProducing());
+  ASSERT_OK_AND_ASSIGN(auto actual,
+                       arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+  AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false);

Review Comment:
   ```suggestion
   
     auto declarations = compute::Declaration::Sequence(
         {compute::Declaration({"scan", dataset::ScanNodeOptions{dataset, 
scan_options}})});
   
     ASSERT_OK_AND_ASSIGN(auto actual, 
compute::DeclarationToTable(declarations));
     // Scan node always emits augmented fields so we drop those
     ASSERT_OK_AND_ASSIGN(auto actualMinusAgumented, actual->SelectColumns({0, 
1, 2}));
   
     auto expected = TableFromJSON(dummy_schema, {R"([
         [null, 1, null],
         [null, 4, null]
     ])"});
   
     AssertTablesEqual(*expected, *actualMinusAgumented, 
/*same_chunk_layout=*/false);
   ```
   
   We have `DeclarationToTable` now which is more succinct.  Also, the scan 
node was attaching augmented columns but you were not seeing them because the 
sink reader's schema didn't have all the columns.



##########
cpp/src/arrow/dataset/scanner_test.cc:
##########
@@ -822,6 +825,25 @@ TEST(TestNewScanner, MissingColumn) {
   AssertArraysEqual(*expected_nulls, *batches[0]->column(1));
 }
 
+void WriteIpcData(const std::string& path,
+                  const std::shared_ptr<fs::FileSystem> file_system,
+                  const std::shared_ptr<Table> input) {
+  EXPECT_OK_AND_ASSIGN(auto mmap, file_system->OpenOutputStream(path));
+  ASSERT_OK_AND_ASSIGN(
+      auto file_writer,
+      MakeFileWriter(mmap, input->schema(), ipc::IpcWriteOptions::Defaults()));
+  TableBatchReader reader(input);
+  std::shared_ptr<RecordBatch> batch;
+  while (true) {
+    ASSERT_OK(reader.ReadNext(&batch));
+    if (batch == nullptr) {
+      break;
+    }
+    ASSERT_OK(file_writer->WriteRecordBatch(*batch));
+  }
+  ASSERT_OK(file_writer->Close());

Review Comment:
   ```suggestion
     ASSERT_OK(file_writer->WriteTable(*input));
     ASSERT_OK(file_writer->Close());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to