westonpace commented on code in PR #14174:
URL: https://github.com/apache/arrow/pull/14174#discussion_r1085413353
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -88,26 +88,53 @@ using internal::checked_cast;
using internal::hash_combine;
namespace engine {
+const auto kNullConsumer = std::make_shared<compute::NullSinkNodeConsumer>();
+
void WriteIpcData(const std::string& path,
const std::shared_ptr<fs::FileSystem> file_system,
const std::shared_ptr<Table> input) {
- EXPECT_OK_AND_ASSIGN(auto mmap, file_system->OpenOutputStream(path));
+ EXPECT_OK_AND_ASSIGN(auto out_stream, file_system->OpenOutputStream(path));
ASSERT_OK_AND_ASSIGN(
auto file_writer,
- MakeFileWriter(mmap, input->schema(), ipc::IpcWriteOptions::Defaults()));
- TableBatchReader reader(input);
- std::shared_ptr<RecordBatch> batch;
- while (true) {
- ASSERT_OK(reader.ReadNext(&batch));
- if (batch == nullptr) {
- break;
- }
- ASSERT_OK(file_writer->WriteRecordBatch(*batch));
- }
+ MakeFileWriter(out_stream, input->schema(),
ipc::IpcWriteOptions::Defaults()));
+ ASSERT_OK(file_writer->WriteTable(*input));
ASSERT_OK(file_writer->Close());
}
-const auto kNullConsumer = std::make_shared<compute::NullSinkNodeConsumer>();
+void CheckRoundTripResult(const std::shared_ptr<Table> expected_table,
+ std::shared_ptr<Buffer>& buf,
+ const std::vector<int>& include_columns = {},
+ const ConversionOptions& conversion_options = {},
+ const compute::SortOptions* sort_options = NULLPTR) {
+ std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg =
MakeExtensionIdRegistry();
+ ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+ ExtensionSet ext_set(ext_id_reg);
+ ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
+ *buf, [] { return kNullConsumer; },
+ ext_id_reg, &ext_set,
conversion_options));
+ auto& other_declrs = std::get<compute::Declaration>(sink_decls[0].inputs[0]);
+
+ ASSERT_OK_AND_ASSIGN(auto output_table,
+ compute::DeclarationToTable(other_declrs,
/*use_threads=*/false));
+
+ if (!include_columns.empty()) {
+ ASSERT_OK_AND_ASSIGN(output_table,
output_table->SelectColumns(include_columns));
+ }
+ if (sort_options) {
+ ASSERT_OK_AND_ASSIGN(auto sort_indices,
+ SortIndices(output_table, std::move(*sort_options)));
+ ASSERT_OK_AND_ASSIGN(auto maybe_table,
+ compute::Take(output_table, std::move(sort_indices),
+ compute::TakeOptions::NoBoundsCheck()));
+ output_table = maybe_table.table();
+ }
+ ASSERT_OK_AND_ASSIGN(output_table, output_table->CombineChunks());
+ ASSERT_OK_AND_ASSIGN(auto merged_expected, expected_table->CombineChunks());
+ std::cout << merged_expected->ToString() << std::endl;
+ std::cout << "----------------------" << std::endl;
+ std::cout << output_table->ToString() << std::endl;
Review Comment:
We should probably remove these prints
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]