lidavidm commented on a change in pull request #10927:
URL: https://github.com/apache/arrow/pull/10927#discussion_r690324959
##########
File path: cpp/src/arrow/compute/exec/union_node.cc
##########
@@ -48,31 +48,29 @@ struct UnionNode : ExecNode {
: ExecNode(plan, inputs, GetInputLabels(inputs),
/*output_schema=*/inputs[0]->output_schema(),
/*num_outputs=*/1) {
- if (this->input_count_.SetTotal(static_cast<int>(inputs.size()))) {
- finished_.MarkFinished();
- }
+ ARROW_DCHECK(this->input_count_.SetTotal(static_cast<int>(inputs.size()))
== false);
Review comment:
This won't work - this will get compiled out in release mode.
##########
File path: cpp/src/arrow/compute/exec/union_node_test.cc
##########
@@ -17,187 +17,126 @@
#include <gmock/gmock-matchers.h>
-#include <iostream>
-
#include "arrow/api.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/test_util.h"
-#include "arrow/pretty_print.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
using testing::UnorderedElementsAreArray;
namespace arrow {
namespace compute {
-void GenerateBatchesFromString(const std::shared_ptr<Schema>& schema,
- const std::vector<util::string_view>&
json_strings,
- BatchesWithSchema* out_batches, int
multiplicity = 1) {
- std::vector<ValueDescr> descrs;
- for (auto&& field : schema->fields()) {
- descrs.emplace_back(field->type());
- }
+struct TestUnionNode : public ::testing::Test {
+ TestUnionNode() : rng_(seed_) {}
- for (auto&& s : json_strings) {
- out_batches->batches.push_back(ExecBatchFromJSON(descrs, s));
- }
+ std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+ static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+ arrow::null(), arrow::boolean(), arrow::int8(), arrow::int16(),
+ arrow::int32(), arrow::int64(), arrow::float16(), arrow::float32(),
+ arrow::float64(), arrow::utf8(), arrow::binary(), arrow::date32()};
- size_t batch_count = out_batches->batches.size();
- for (int repeat = 1; repeat < multiplicity; ++repeat) {
- for (size_t i = 0; i < batch_count; ++i) {
- out_batches->batches.push_back(out_batches->batches[i]);
+ std::vector<std::shared_ptr<Field>> fields(num_inputs);
+ for (size_t i = 0; i < num_inputs; i++) {
+ auto col_type = some_arrow_types.at(rand() % some_arrow_types.size());
+ fields[i] =
+ field("column_" + std::to_string(i) + "_" + col_type->ToString(),
col_type);
}
+ return schema(fields);
}
- out_batches->schema = schema;
-}
-
-void CheckRunOutput(const BatchesWithSchema& l_batches,
- const BatchesWithSchema& r_batches,
- const BatchesWithSchema& exp_batches, bool parallel =
false) {
- SCOPED_TRACE(parallel ? "parallel" : "single threaded");
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration union_decl{"union", ExecNodeOptions{}};
-
- // add left source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-
- ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
-
- Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
-}
-
-void RunNonEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- BatchesWithSchema l_batches, r_batches, exp_batches;
-
- int multiplicity = parallel ? 100 : 1;
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
- },
- &l_batches, multiplicity);
-
- GenerateBatchesFromString(r_schema,
- {
- R"([[10,"A"]])",
- },
- &r_batches, multiplicity);
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
-
- R"([[10,"A"]])",
- },
- &exp_batches, multiplicity);
- CheckRunOutput(l_batches, r_batches, exp_batches, parallel);
-}
+ void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+ size_t num_batches, BatchesWithSchema*
out_batches,
+ int multiplicity = 1, int64_t batch_size = 4)
{
+ if (num_batches == 0) {
+ auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+ out_batches->batches.push_back(empty_record_batch);
+ } else {
+ for (size_t j = 0; j < num_batches; j++) {
+ out_batches->batches.push_back(
+ ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+ }
+ }
-void RunEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
+ size_t batch_count = out_batches->batches.size();
+ for (int repeat = 1; repeat < multiplicity; ++repeat) {
+ for (size_t i = 0; i < batch_count; ++i) {
+ out_batches->batches.push_back(out_batches->batches[i]);
+ }
+ }
+ out_batches->schema = schema;
+ }
- int multiplicity = parallel ? 100 : 1;
+ void CheckRunOutput(const std::vector<BatchesWithSchema>& batches,
+ const BatchesWithSchema& exp_batches, bool parallel =
false) {
+ SCOPED_TRACE(parallel ? "parallel" : "single threaded");
- BatchesWithSchema l_empty, r_empty, output_batches;
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- GenerateBatchesFromString(l_schema, {R"([])"}, &l_empty, multiplicity);
- GenerateBatchesFromString(r_schema, {R"([])"}, &r_empty, multiplicity);
+ Declaration union_decl{"union", ExecNodeOptions{}};
- GenerateBatchesFromString(l_schema, {R"([])", R"([])"}, &output_batches,
multiplicity);
+ for (const auto& batch : batches) {
+ union_decl.inputs.emplace_back(Declaration{
+ "source", SourceNodeOptions{batch.schema, batch.gen(parallel,
+
/*slow=*/false)}});
+ }
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ // Test UnionNode::Make with zero inputs
+ if (batches.size() == 0) {
+ ASSERT_RAISES(Invalid, Declaration::Sequence(
+ {union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ return;
+ } else {
+ ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ }
- CheckRunOutput(l_empty, r_empty, output_batches);
-}
+ Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-TEST(UnionTest, TestNonEmpty) {
- for (bool parallel : {false, true}) {
- RunNonEmptyTest(parallel);
+ auto expected_matcher =
+ Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
+ ASSERT_THAT(actual, expected_matcher);
}
-}
-TEST(UnionTest, TestEmpty) {
- for (bool parallel : {false, true}) {
- RunEmptyTest(parallel);
+ void CheckUnionExecNode(size_t num_input_nodes, size_t num_batches, bool
parallel) {
+ auto random_schema = GenerateRandomSchema(num_input_nodes);
+
+ int multiplicity = parallel ? 10 : 1;
+ std::vector<std::shared_ptr<RecordBatch>> all_record_batches;
+ std::vector<BatchesWithSchema> input_batches(num_input_nodes);
+ BatchesWithSchema exp_batches;
+ exp_batches.schema = random_schema;
+ for (size_t i = 0; i < num_input_nodes; i++) {
+ GenerateBatchesFromSchema(random_schema, num_batches, &input_batches[i],
+ multiplicity, kBatchSize);
+ for (const auto& batch : input_batches[i].batches) {
+ exp_batches.batches.push_back(batch);
+ }
+ }
+ CheckRunOutput(input_batches, exp_batches, parallel);
}
-}
-
-void TestUnionRandom(const std::shared_ptr<DataType>& data_type, bool parallel,
- int num_batches, int batch_size) {
- auto l_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
- auto r_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
-
- // generate data
- auto l_batches = MakeRandomBatches(l_schema, num_batches, batch_size);
- auto r_batches = MakeRandomBatches(r_schema, num_batches, batch_size);
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration Union{"union", ExecNodeOptions{}};
-
- // add left source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- ASSERT_OK(Declaration::Sequence({Union, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
+ ::arrow::random::SeedType seed_ = 0xdeadbeef;
+ ::arrow::random::RandomArrayGenerator rng_;
- auto actual = StartAndCollect(plan.get(), sink_gen);
+ static constexpr int kNumBatches = 10;
Review comment:
nit: put constants towards the top. See
https://google.github.io/styleguide/cppguide.html#Declaration_Order
##########
File path: cpp/src/arrow/compute/exec/union_node_test.cc
##########
@@ -17,187 +17,126 @@
#include <gmock/gmock-matchers.h>
-#include <iostream>
-
#include "arrow/api.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/test_util.h"
-#include "arrow/pretty_print.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
using testing::UnorderedElementsAreArray;
namespace arrow {
namespace compute {
-void GenerateBatchesFromString(const std::shared_ptr<Schema>& schema,
- const std::vector<util::string_view>&
json_strings,
- BatchesWithSchema* out_batches, int
multiplicity = 1) {
- std::vector<ValueDescr> descrs;
- for (auto&& field : schema->fields()) {
- descrs.emplace_back(field->type());
- }
+struct TestUnionNode : public ::testing::Test {
+ TestUnionNode() : rng_(seed_) {}
- for (auto&& s : json_strings) {
- out_batches->batches.push_back(ExecBatchFromJSON(descrs, s));
- }
+ std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+ static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+ arrow::null(), arrow::boolean(), arrow::int8(), arrow::int16(),
+ arrow::int32(), arrow::int64(), arrow::float16(), arrow::float32(),
+ arrow::float64(), arrow::utf8(), arrow::binary(), arrow::date32()};
- size_t batch_count = out_batches->batches.size();
- for (int repeat = 1; repeat < multiplicity; ++repeat) {
- for (size_t i = 0; i < batch_count; ++i) {
- out_batches->batches.push_back(out_batches->batches[i]);
+ std::vector<std::shared_ptr<Field>> fields(num_inputs);
+ for (size_t i = 0; i < num_inputs; i++) {
+ auto col_type = some_arrow_types.at(rand() % some_arrow_types.size());
+ fields[i] =
+ field("column_" + std::to_string(i) + "_" + col_type->ToString(),
col_type);
}
+ return schema(fields);
}
- out_batches->schema = schema;
-}
-
-void CheckRunOutput(const BatchesWithSchema& l_batches,
- const BatchesWithSchema& r_batches,
- const BatchesWithSchema& exp_batches, bool parallel =
false) {
- SCOPED_TRACE(parallel ? "parallel" : "single threaded");
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration union_decl{"union", ExecNodeOptions{}};
-
- // add left source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-
- ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
-
- Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
-}
-
-void RunNonEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- BatchesWithSchema l_batches, r_batches, exp_batches;
-
- int multiplicity = parallel ? 100 : 1;
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
- },
- &l_batches, multiplicity);
-
- GenerateBatchesFromString(r_schema,
- {
- R"([[10,"A"]])",
- },
- &r_batches, multiplicity);
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
-
- R"([[10,"A"]])",
- },
- &exp_batches, multiplicity);
- CheckRunOutput(l_batches, r_batches, exp_batches, parallel);
-}
+ void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+ size_t num_batches, BatchesWithSchema*
out_batches,
+ int multiplicity = 1, int64_t batch_size = 4)
{
+ if (num_batches == 0) {
+ auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+ out_batches->batches.push_back(empty_record_batch);
+ } else {
+ for (size_t j = 0; j < num_batches; j++) {
+ out_batches->batches.push_back(
+ ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+ }
+ }
-void RunEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
+ size_t batch_count = out_batches->batches.size();
+ for (int repeat = 1; repeat < multiplicity; ++repeat) {
+ for (size_t i = 0; i < batch_count; ++i) {
+ out_batches->batches.push_back(out_batches->batches[i]);
+ }
+ }
+ out_batches->schema = schema;
+ }
- int multiplicity = parallel ? 100 : 1;
+ void CheckRunOutput(const std::vector<BatchesWithSchema>& batches,
+ const BatchesWithSchema& exp_batches, bool parallel =
false) {
+ SCOPED_TRACE(parallel ? "parallel" : "single threaded");
- BatchesWithSchema l_empty, r_empty, output_batches;
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- GenerateBatchesFromString(l_schema, {R"([])"}, &l_empty, multiplicity);
- GenerateBatchesFromString(r_schema, {R"([])"}, &r_empty, multiplicity);
+ Declaration union_decl{"union", ExecNodeOptions{}};
- GenerateBatchesFromString(l_schema, {R"([])", R"([])"}, &output_batches,
multiplicity);
+ for (const auto& batch : batches) {
+ union_decl.inputs.emplace_back(Declaration{
+ "source", SourceNodeOptions{batch.schema, batch.gen(parallel,
+
/*slow=*/false)}});
+ }
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ // Test UnionNode::Make with zero inputs
+ if (batches.size() == 0) {
+ ASSERT_RAISES(Invalid, Declaration::Sequence(
+ {union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ return;
+ } else {
+ ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ }
- CheckRunOutput(l_empty, r_empty, output_batches);
-}
+ Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-TEST(UnionTest, TestNonEmpty) {
- for (bool parallel : {false, true}) {
- RunNonEmptyTest(parallel);
+ auto expected_matcher =
+ Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
+ ASSERT_THAT(actual, expected_matcher);
}
-}
-TEST(UnionTest, TestEmpty) {
- for (bool parallel : {false, true}) {
- RunEmptyTest(parallel);
+ void CheckUnionExecNode(size_t num_input_nodes, size_t num_batches, bool
parallel) {
+ auto random_schema = GenerateRandomSchema(num_input_nodes);
+
+ int multiplicity = parallel ? 10 : 1;
+ std::vector<std::shared_ptr<RecordBatch>> all_record_batches;
+ std::vector<BatchesWithSchema> input_batches(num_input_nodes);
+ BatchesWithSchema exp_batches;
+ exp_batches.schema = random_schema;
+ for (size_t i = 0; i < num_input_nodes; i++) {
+ GenerateBatchesFromSchema(random_schema, num_batches, &input_batches[i],
+ multiplicity, kBatchSize);
+ for (const auto& batch : input_batches[i].batches) {
+ exp_batches.batches.push_back(batch);
+ }
+ }
+ CheckRunOutput(input_batches, exp_batches, parallel);
}
-}
-
-void TestUnionRandom(const std::shared_ptr<DataType>& data_type, bool parallel,
- int num_batches, int batch_size) {
- auto l_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
- auto r_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
-
- // generate data
- auto l_batches = MakeRandomBatches(l_schema, num_batches, batch_size);
- auto r_batches = MakeRandomBatches(r_schema, num_batches, batch_size);
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration Union{"union", ExecNodeOptions{}};
-
- // add left source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- ASSERT_OK(Declaration::Sequence({Union, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
+ ::arrow::random::SeedType seed_ = 0xdeadbeef;
+ ::arrow::random::RandomArrayGenerator rng_;
- auto actual = StartAndCollect(plan.get(), sink_gen);
+ static constexpr int kNumBatches = 10;
+ static constexpr int kBatchSize = 10;
+};
- BatchesWithSchema exp_batches;
- exp_batches.schema = l_schema;
- exp_batches.batches.reserve(l_batches.batches.size() +
r_batches.batches.size());
-
- std::copy(l_batches.batches.begin(), l_batches.batches.end(),
- std::back_inserter(exp_batches.batches));
- std::copy(r_batches.batches.begin(), r_batches.batches.end(),
- std::back_inserter(exp_batches.batches));
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
+TEST_F(TestUnionNode, TestNonEmpty) {
+ for (bool parallel : {false, true}) {
+ for (int64_t num_input_nodes : {1, 2, 4, 8}) {
+ this->CheckUnionExecNode(num_input_nodes, /*num_batches=*/kNumBatches,
parallel);
+ }
+ }
}
+TEST_F(TestUnionNode, TestWithAnEmptyBatch) { this->CheckUnionExecNode(2, 0,
false); }
-class UnionTestRand
- : public testing::TestWithParam<std::tuple<std::shared_ptr<DataType>,
bool>> {};
-
-static constexpr int kNumBatches = 100;
-static constexpr int kBatchSize = 10;
-
-INSTANTIATE_TEST_SUITE_P(UnionTestRand, UnionTestRand,
- ::testing::Combine(::testing::Values(int8(), int32(),
int64(),
- float32(),
float64()),
- ::testing::Values(false, true)));
-
-TEST_P(UnionTestRand, TestingTypes) {
- TestUnionRandom(std::get<0>(GetParam()), std::get<1>(GetParam()),
kNumBatches,
- kBatchSize);
-}
+TEST_F(TestUnionNode, TestEmpty) { this->CheckUnionExecNode(0, 0, false); }
Review comment:
And conversely above in TestNonEmpty, the comment is redundant since
there's a constant.
##########
File path: cpp/src/arrow/compute/exec/union_node_test.cc
##########
@@ -17,187 +17,126 @@
#include <gmock/gmock-matchers.h>
-#include <iostream>
-
#include "arrow/api.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/test_util.h"
-#include "arrow/pretty_print.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
using testing::UnorderedElementsAreArray;
namespace arrow {
namespace compute {
-void GenerateBatchesFromString(const std::shared_ptr<Schema>& schema,
- const std::vector<util::string_view>&
json_strings,
- BatchesWithSchema* out_batches, int
multiplicity = 1) {
- std::vector<ValueDescr> descrs;
- for (auto&& field : schema->fields()) {
- descrs.emplace_back(field->type());
- }
+struct TestUnionNode : public ::testing::Test {
+ TestUnionNode() : rng_(seed_) {}
- for (auto&& s : json_strings) {
- out_batches->batches.push_back(ExecBatchFromJSON(descrs, s));
- }
+ std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+ static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+ arrow::null(), arrow::boolean(), arrow::int8(), arrow::int16(),
+ arrow::int32(), arrow::int64(), arrow::float16(), arrow::float32(),
+ arrow::float64(), arrow::utf8(), arrow::binary(), arrow::date32()};
- size_t batch_count = out_batches->batches.size();
- for (int repeat = 1; repeat < multiplicity; ++repeat) {
- for (size_t i = 0; i < batch_count; ++i) {
- out_batches->batches.push_back(out_batches->batches[i]);
+ std::vector<std::shared_ptr<Field>> fields(num_inputs);
+ for (size_t i = 0; i < num_inputs; i++) {
+ auto col_type = some_arrow_types.at(rand() % some_arrow_types.size());
+ fields[i] =
+ field("column_" + std::to_string(i) + "_" + col_type->ToString(),
col_type);
}
+ return schema(fields);
}
- out_batches->schema = schema;
-}
-
-void CheckRunOutput(const BatchesWithSchema& l_batches,
- const BatchesWithSchema& r_batches,
- const BatchesWithSchema& exp_batches, bool parallel =
false) {
- SCOPED_TRACE(parallel ? "parallel" : "single threaded");
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration union_decl{"union", ExecNodeOptions{}};
-
- // add left source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-
- ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
-
- Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
-}
-
-void RunNonEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- BatchesWithSchema l_batches, r_batches, exp_batches;
-
- int multiplicity = parallel ? 100 : 1;
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
- },
- &l_batches, multiplicity);
-
- GenerateBatchesFromString(r_schema,
- {
- R"([[10,"A"]])",
- },
- &r_batches, multiplicity);
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
-
- R"([[10,"A"]])",
- },
- &exp_batches, multiplicity);
- CheckRunOutput(l_batches, r_batches, exp_batches, parallel);
-}
+ void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+ size_t num_batches, BatchesWithSchema*
out_batches,
+ int multiplicity = 1, int64_t batch_size = 4)
{
+ if (num_batches == 0) {
+ auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+ out_batches->batches.push_back(empty_record_batch);
+ } else {
+ for (size_t j = 0; j < num_batches; j++) {
+ out_batches->batches.push_back(
+ ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+ }
+ }
-void RunEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
+ size_t batch_count = out_batches->batches.size();
+ for (int repeat = 1; repeat < multiplicity; ++repeat) {
+ for (size_t i = 0; i < batch_count; ++i) {
+ out_batches->batches.push_back(out_batches->batches[i]);
+ }
+ }
+ out_batches->schema = schema;
+ }
- int multiplicity = parallel ? 100 : 1;
+ void CheckRunOutput(const std::vector<BatchesWithSchema>& batches,
+ const BatchesWithSchema& exp_batches, bool parallel =
false) {
+ SCOPED_TRACE(parallel ? "parallel" : "single threaded");
- BatchesWithSchema l_empty, r_empty, output_batches;
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- GenerateBatchesFromString(l_schema, {R"([])"}, &l_empty, multiplicity);
- GenerateBatchesFromString(r_schema, {R"([])"}, &r_empty, multiplicity);
+ Declaration union_decl{"union", ExecNodeOptions{}};
- GenerateBatchesFromString(l_schema, {R"([])", R"([])"}, &output_batches,
multiplicity);
+ for (const auto& batch : batches) {
+ union_decl.inputs.emplace_back(Declaration{
+ "source", SourceNodeOptions{batch.schema, batch.gen(parallel,
+
/*slow=*/false)}});
+ }
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ // Test UnionNode::Make with zero inputs
+ if (batches.size() == 0) {
+ ASSERT_RAISES(Invalid, Declaration::Sequence(
+ {union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ return;
+ } else {
+ ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ }
- CheckRunOutput(l_empty, r_empty, output_batches);
-}
+ Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-TEST(UnionTest, TestNonEmpty) {
- for (bool parallel : {false, true}) {
- RunNonEmptyTest(parallel);
+ auto expected_matcher =
+ Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
+ ASSERT_THAT(actual, expected_matcher);
}
-}
-TEST(UnionTest, TestEmpty) {
- for (bool parallel : {false, true}) {
- RunEmptyTest(parallel);
+ void CheckUnionExecNode(size_t num_input_nodes, size_t num_batches, bool
parallel) {
+ auto random_schema = GenerateRandomSchema(num_input_nodes);
+
+ int multiplicity = parallel ? 10 : 1;
+ std::vector<std::shared_ptr<RecordBatch>> all_record_batches;
+ std::vector<BatchesWithSchema> input_batches(num_input_nodes);
+ BatchesWithSchema exp_batches;
+ exp_batches.schema = random_schema;
+ for (size_t i = 0; i < num_input_nodes; i++) {
+ GenerateBatchesFromSchema(random_schema, num_batches, &input_batches[i],
+ multiplicity, kBatchSize);
+ for (const auto& batch : input_batches[i].batches) {
+ exp_batches.batches.push_back(batch);
+ }
+ }
+ CheckRunOutput(input_batches, exp_batches, parallel);
}
-}
-
-void TestUnionRandom(const std::shared_ptr<DataType>& data_type, bool parallel,
- int num_batches, int batch_size) {
- auto l_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
- auto r_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
-
- // generate data
- auto l_batches = MakeRandomBatches(l_schema, num_batches, batch_size);
- auto r_batches = MakeRandomBatches(r_schema, num_batches, batch_size);
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration Union{"union", ExecNodeOptions{}};
-
- // add left source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- ASSERT_OK(Declaration::Sequence({Union, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
+ ::arrow::random::SeedType seed_ = 0xdeadbeef;
+ ::arrow::random::RandomArrayGenerator rng_;
- auto actual = StartAndCollect(plan.get(), sink_gen);
+ static constexpr int kNumBatches = 10;
+ static constexpr int kBatchSize = 10;
+};
- BatchesWithSchema exp_batches;
- exp_batches.schema = l_schema;
- exp_batches.batches.reserve(l_batches.batches.size() +
r_batches.batches.size());
-
- std::copy(l_batches.batches.begin(), l_batches.batches.end(),
- std::back_inserter(exp_batches.batches));
- std::copy(r_batches.batches.begin(), r_batches.batches.end(),
- std::back_inserter(exp_batches.batches));
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
+TEST_F(TestUnionNode, TestNonEmpty) {
+ for (bool parallel : {false, true}) {
+ for (int64_t num_input_nodes : {1, 2, 4, 8}) {
+ this->CheckUnionExecNode(num_input_nodes, /*num_batches=*/kNumBatches,
parallel);
+ }
+ }
}
+TEST_F(TestUnionNode, TestWithAnEmptyBatch) { this->CheckUnionExecNode(2, 0,
false); }
-class UnionTestRand
- : public testing::TestWithParam<std::tuple<std::shared_ptr<DataType>,
bool>> {};
-
-static constexpr int kNumBatches = 100;
-static constexpr int kBatchSize = 10;
-
-INSTANTIATE_TEST_SUITE_P(UnionTestRand, UnionTestRand,
- ::testing::Combine(::testing::Values(int8(), int32(),
int64(),
- float32(),
float64()),
- ::testing::Values(false, true)));
-
-TEST_P(UnionTestRand, TestingTypes) {
- TestUnionRandom(std::get<0>(GetParam()), std::get<1>(GetParam()),
kNumBatches,
- kBatchSize);
-}
+TEST_F(TestUnionNode, TestEmpty) { this->CheckUnionExecNode(0, 0, false); }
Review comment:
Nit: usually for constant parameters like this, we prefix the value with
the argument name. See
https://google.github.io/styleguide/cppguide.html#Function_Argument_Comments
##########
File path: cpp/src/arrow/compute/exec/union_node_test.cc
##########
@@ -17,187 +17,126 @@
#include <gmock/gmock-matchers.h>
-#include <iostream>
-
#include "arrow/api.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/test_util.h"
-#include "arrow/pretty_print.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
using testing::UnorderedElementsAreArray;
namespace arrow {
namespace compute {
-void GenerateBatchesFromString(const std::shared_ptr<Schema>& schema,
- const std::vector<util::string_view>&
json_strings,
- BatchesWithSchema* out_batches, int
multiplicity = 1) {
- std::vector<ValueDescr> descrs;
- for (auto&& field : schema->fields()) {
- descrs.emplace_back(field->type());
- }
+struct TestUnionNode : public ::testing::Test {
+ TestUnionNode() : rng_(seed_) {}
- for (auto&& s : json_strings) {
- out_batches->batches.push_back(ExecBatchFromJSON(descrs, s));
- }
+ std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
+ static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
+ arrow::null(), arrow::boolean(), arrow::int8(), arrow::int16(),
+ arrow::int32(), arrow::int64(), arrow::float16(), arrow::float32(),
+ arrow::float64(), arrow::utf8(), arrow::binary(), arrow::date32()};
- size_t batch_count = out_batches->batches.size();
- for (int repeat = 1; repeat < multiplicity; ++repeat) {
- for (size_t i = 0; i < batch_count; ++i) {
- out_batches->batches.push_back(out_batches->batches[i]);
+ std::vector<std::shared_ptr<Field>> fields(num_inputs);
+ for (size_t i = 0; i < num_inputs; i++) {
+ auto col_type = some_arrow_types.at(rand() % some_arrow_types.size());
+ fields[i] =
+ field("column_" + std::to_string(i) + "_" + col_type->ToString(),
col_type);
}
+ return schema(fields);
}
- out_batches->schema = schema;
-}
-
-void CheckRunOutput(const BatchesWithSchema& l_batches,
- const BatchesWithSchema& r_batches,
- const BatchesWithSchema& exp_batches, bool parallel =
false) {
- SCOPED_TRACE(parallel ? "parallel" : "single threaded");
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration union_decl{"union", ExecNodeOptions{}};
-
- // add left source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- union_decl.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-
- ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
-
- Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
-}
-
-void RunNonEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- BatchesWithSchema l_batches, r_batches, exp_batches;
-
- int multiplicity = parallel ? 100 : 1;
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
- },
- &l_batches, multiplicity);
-
- GenerateBatchesFromString(r_schema,
- {
- R"([[10,"A"]])",
- },
- &r_batches, multiplicity);
-
- GenerateBatchesFromString(l_schema,
- {
- R"([[0,"d"], [1,"b"]])",
- R"([[2,"d"], [3,"a"], [4,"a"]])",
-
- R"([[10,"A"]])",
- },
- &exp_batches, multiplicity);
- CheckRunOutput(l_batches, r_batches, exp_batches, parallel);
-}
+ void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
+ size_t num_batches, BatchesWithSchema*
out_batches,
+ int multiplicity = 1, int64_t batch_size = 4)
{
+ if (num_batches == 0) {
+ auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
+ out_batches->batches.push_back(empty_record_batch);
+ } else {
+ for (size_t j = 0; j < num_batches; j++) {
+ out_batches->batches.push_back(
+ ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+ }
+ }
-void RunEmptyTest(bool parallel) {
- auto l_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
- auto r_schema = schema({field("colum_i32", int32()), field("colum_str",
utf8())});
+ size_t batch_count = out_batches->batches.size();
+ for (int repeat = 1; repeat < multiplicity; ++repeat) {
+ for (size_t i = 0; i < batch_count; ++i) {
+ out_batches->batches.push_back(out_batches->batches[i]);
+ }
+ }
+ out_batches->schema = schema;
+ }
- int multiplicity = parallel ? 100 : 1;
+ void CheckRunOutput(const std::vector<BatchesWithSchema>& batches,
+ const BatchesWithSchema& exp_batches, bool parallel =
false) {
+ SCOPED_TRACE(parallel ? "parallel" : "single threaded");
- BatchesWithSchema l_empty, r_empty, output_batches;
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
- GenerateBatchesFromString(l_schema, {R"([])"}, &l_empty, multiplicity);
- GenerateBatchesFromString(r_schema, {R"([])"}, &r_empty, multiplicity);
+ Declaration union_decl{"union", ExecNodeOptions{}};
- GenerateBatchesFromString(l_schema, {R"([])", R"([])"}, &output_batches,
multiplicity);
+ for (const auto& batch : batches) {
+ union_decl.inputs.emplace_back(Declaration{
+ "source", SourceNodeOptions{batch.schema, batch.gen(parallel,
+
/*slow=*/false)}});
+ }
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ // Test UnionNode::Make with zero inputs
+ if (batches.size() == 0) {
+ ASSERT_RAISES(Invalid, Declaration::Sequence(
+ {union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ return;
+ } else {
+ ASSERT_OK(Declaration::Sequence({union_decl, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ }
- CheckRunOutput(l_empty, r_empty, output_batches);
-}
+ Future<std::vector<ExecBatch>> actual = StartAndCollect(plan.get(),
sink_gen);
-TEST(UnionTest, TestNonEmpty) {
- for (bool parallel : {false, true}) {
- RunNonEmptyTest(parallel);
+ auto expected_matcher =
+ Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
+ ASSERT_THAT(actual, expected_matcher);
}
-}
-TEST(UnionTest, TestEmpty) {
- for (bool parallel : {false, true}) {
- RunEmptyTest(parallel);
+ void CheckUnionExecNode(size_t num_input_nodes, size_t num_batches, bool
parallel) {
+ auto random_schema = GenerateRandomSchema(num_input_nodes);
+
+ int multiplicity = parallel ? 10 : 1;
+ std::vector<std::shared_ptr<RecordBatch>> all_record_batches;
+ std::vector<BatchesWithSchema> input_batches(num_input_nodes);
+ BatchesWithSchema exp_batches;
+ exp_batches.schema = random_schema;
+ for (size_t i = 0; i < num_input_nodes; i++) {
+ GenerateBatchesFromSchema(random_schema, num_batches, &input_batches[i],
+ multiplicity, kBatchSize);
+ for (const auto& batch : input_batches[i].batches) {
+ exp_batches.batches.push_back(batch);
+ }
+ }
+ CheckRunOutput(input_batches, exp_batches, parallel);
}
-}
-
-void TestUnionRandom(const std::shared_ptr<DataType>& data_type, bool parallel,
- int num_batches, int batch_size) {
- auto l_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
- auto r_schema = schema({field("colum0", data_type), field("colum1",
data_type)});
-
- // generate data
- auto l_batches = MakeRandomBatches(l_schema, num_batches, batch_size);
- auto r_batches = MakeRandomBatches(r_schema, num_batches, batch_size);
-
- ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
-
- Declaration Union{"union", ExecNodeOptions{}};
-
- // add left source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel,
-
/*slow=*/false)}});
- // add right source
- Union.inputs.emplace_back(Declaration{
- "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel,
-
/*slow=*/false)}});
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- ASSERT_OK(Declaration::Sequence({Union, {"sink",
SinkNodeOptions{&sink_gen}}})
- .AddToPlan(plan.get()));
+ ::arrow::random::SeedType seed_ = 0xdeadbeef;
+ ::arrow::random::RandomArrayGenerator rng_;
- auto actual = StartAndCollect(plan.get(), sink_gen);
+ static constexpr int kNumBatches = 10;
+ static constexpr int kBatchSize = 10;
+};
- BatchesWithSchema exp_batches;
- exp_batches.schema = l_schema;
- exp_batches.batches.reserve(l_batches.batches.size() +
r_batches.batches.size());
-
- std::copy(l_batches.batches.begin(), l_batches.batches.end(),
- std::back_inserter(exp_batches.batches));
- std::copy(r_batches.batches.begin(), r_batches.batches.end(),
- std::back_inserter(exp_batches.batches));
-
- auto expected_matcher =
- Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
- ASSERT_THAT(actual, expected_matcher);
+TEST_F(TestUnionNode, TestNonEmpty) {
+ for (bool parallel : {false, true}) {
+ for (int64_t num_input_nodes : {1, 2, 4, 8}) {
+ this->CheckUnionExecNode(num_input_nodes, /*num_batches=*/kNumBatches,
parallel);
+ }
+ }
}
+TEST_F(TestUnionNode, TestWithAnEmptyBatch) { this->CheckUnionExecNode(2, 0,
false); }
-class UnionTestRand
- : public testing::TestWithParam<std::tuple<std::shared_ptr<DataType>,
bool>> {};
-
-static constexpr int kNumBatches = 100;
-static constexpr int kBatchSize = 10;
-
-INSTANTIATE_TEST_SUITE_P(UnionTestRand, UnionTestRand,
- ::testing::Combine(::testing::Values(int8(), int32(),
int64(),
- float32(),
float64()),
- ::testing::Values(false, true)));
-
-TEST_P(UnionTestRand, TestingTypes) {
- TestUnionRandom(std::get<0>(GetParam()), std::get<1>(GetParam()),
kNumBatches,
- kBatchSize);
-}
+TEST_F(TestUnionNode, TestEmpty) { this->CheckUnionExecNode(0, 0, false); }
Review comment:
```suggestion
TEST_F(TestUnionNode, TestEmpty) {
this->CheckUnionExecNode(/*num_input_nodes*/0, /*num_batches=*/0,
/*parallel=*/false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]