westonpace commented on code in PR #39487: URL: https://github.com/apache/arrow/pull/39487#discussion_r1505997417
########## cpp/src/arrow/acero/hash_join_node_test.cc: ########## @@ -1893,58 +1893,146 @@ TEST(HashJoin, CheckHashJoinNodeOptionsValidation) { } } -TEST(HashJoin, ResidualFilter) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); - - BatchesWithSchema input_left; - input_left.batches = {ExecBatchFromJSON({int32(), int32(), utf8()}, R"([ - [1, 6, "alpha"], - [2, 5, "beta"], - [3, 4, "alpha"] - ])")}; - input_left.schema = - schema({field("l1", int32()), field("l2", int32()), field("l_str", utf8())}); - - BatchesWithSchema input_right; - input_right.batches = {ExecBatchFromJSON({int32(), int32(), utf8()}, R"([ - [5, 11, "alpha"], - [2, 12, "beta"], - [4, 16, "alpha"] - ])")}; - input_right.schema = - schema({field("r1", int32()), field("r2", int32()), field("r_str", utf8())}); +class ResidualFilterCaseRunner { + public: + ResidualFilterCaseRunner(BatchesWithSchema left_input, BatchesWithSchema right_input) + : left_input_(std::move(left_input)), right_input_(std::move(right_input)) {} + + void Run(JoinType join_type, std::vector<FieldRef> left_keys, + std::vector<FieldRef> right_keys, Expression filter, + const std::vector<ExecBatch>& expected) const { + RunInternal(HashJoinNodeOptions{join_type, std::move(left_keys), + std::move(right_keys), std::move(filter)}, + expected); + } + + void Run(JoinType join_type, std::vector<FieldRef> left_keys, + std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, + std::vector<FieldRef> right_output, Expression filter, + const std::vector<ExecBatch>& expected) const { + RunInternal(HashJoinNodeOptions{join_type, std::move(left_keys), + std::move(right_keys), std::move(left_output), + std::move(right_output), std::move(filter)}, + expected); + } + + private: + void RunInternal(const HashJoinNodeOptions& options, + const std::vector<ExecBatch>& expected) const { + auto join_type_str = JoinTypeString(options.join_type); + auto join_cond_str = + JoinConditionString(options.left_keys, options.right_keys, options.filter); + auto output_str = OutputString(options.left_output, options.right_output); + for (bool parallel : {false, true}) { + auto parallel_str = parallel ? "parallel" : "serial"; + ARROW_SCOPED_TRACE(join_type_str + " " + join_cond_str + " " + output_str + " " + + parallel_str); - Declaration left{ - "source", - SourceNodeOptions{input_left.schema, input_left.gen(parallel, /*slow=*/false)}}; - Declaration right{ - "source", - SourceNodeOptions{input_right.schema, input_right.gen(parallel, /*slow=*/false)}}; + Declaration left{"source", + SourceNodeOptions{left_input_.schema, + left_input_.gen(parallel, /*slow=*/false)}}; + Declaration right{"source", + SourceNodeOptions{right_input_.schema, + right_input_.gen(parallel, /*slow=*/false)}}; - Expression mul = call("multiply", {field_ref("l1"), field_ref("l2")}); - Expression combination = call("add", {mul, field_ref("r1")}); - Expression residual_filter = less_equal(combination, field_ref("r2")); + Declaration join{"hashjoin", {std::move(left), std::move(right)}, options}; - HashJoinNodeOptions join_opts{ - JoinType::FULL_OUTER, - /*left_keys=*/{"l_str"}, - /*right_keys=*/{"r_str"}, std::move(residual_filter), "l_", "r_"}; + ASSERT_OK_AND_ASSIGN(auto result, + DeclarationToExecBatches(std::move(join), parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, expected, result.batches); + } + } - Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts}; + private: + BatchesWithSchema left_input_; + BatchesWithSchema right_input_; - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(join), parallel)); + private: + static std::string JoinTypeString(JoinType t) { + switch (t) { + case JoinType::LEFT_SEMI: + return "LEFT_SEMI"; + case JoinType::RIGHT_SEMI: + return "RIGHT_SEMI"; + case JoinType::LEFT_ANTI: + return "LEFT_ANTI"; + case JoinType::RIGHT_ANTI: + return "RIGHT_ANTI"; + case JoinType::INNER: + return "INNER"; + case JoinType::LEFT_OUTER: + return "LEFT_OUTER"; + case JoinType::RIGHT_OUTER: + return "RIGHT_OUTER"; + case JoinType::FULL_OUTER: + return "FULL_OUTER"; + } + ARROW_DCHECK(false); + return ""; + } + + static std::string JoinConditionString(const std::vector<FieldRef>& left_keys, + const std::vector<FieldRef>& right_keys, + const Expression& filter) { + ARROW_DCHECK(left_keys.size() > 0); + ARROW_DCHECK(left_keys.size() == right_keys.size()); + std::stringstream ss; + ss << "on ("; + for (size_t i = 0; i < left_keys.size(); ++i) { + ss << left_keys[i].ToString() << " = " << right_keys[i].ToString() << " and "; + } + ss << filter.ToString(); + ss << ")"; + return ss.str(); + } + + static std::string OutputString(const std::vector<FieldRef>& left_output, + const std::vector<FieldRef>& right_output) { + std::vector<FieldRef> both_output; + std::copy(left_output.begin(), left_output.end(), std::back_inserter(both_output)); + std::copy(right_output.begin(), right_output.end(), std::back_inserter(both_output)); Review Comment: Does `std::vector::insert` not work here? ########## cpp/src/arrow/acero/hash_join_node_test.cc: ########## @@ -1959,47 +2047,993 @@ TEST(HashJoin, TrivialResidualFilter) { std::vector<std::string> expected_strings = {expected_true, expected_false}; std::vector<Expression> filters = {always_true, always_false}; + BatchesWithSchema input_left; + input_left.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ + [1, "alpha"]])")}; + input_left.schema = schema({field("l1", int32()), field("l_str", utf8())}); + + BatchesWithSchema input_right; + input_right.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ + [1, "alpha"]])")}; + input_right.schema = schema({field("r1", int32()), field("r_str", utf8())}); + + ResidualFilterCaseRunner runner{std::move(input_left), std::move(input_right)}; + for (size_t test_id = 0; test_id < 2; test_id++) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + runner.Run(JoinType::INNER, {"l_str"}, {"r_str"}, filters[test_id], + {ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, + expected_strings[test_id])}); + } +} - BatchesWithSchema input_left; - input_left.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ - [1, "alpha"] - ])")}; - input_left.schema = schema({field("l1", int32()), field("l_str", utf8())}); +TEST(HashJoin, FineGrainedResidualFilter) { + struct JoinSchema { + std::shared_ptr<Schema> left, right; - BatchesWithSchema input_right; - input_right.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ - [1, "alpha"] - ])")}; - input_right.schema = schema({field("r1", int32()), field("r_str", utf8())}); + struct Projector { + std::shared_ptr<Schema> left, right; + std::vector<int> left_output, right_output; - auto exec_ctx = std::make_unique<ExecContext>( - default_memory_pool(), - parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::vector<FieldRef> LeftOutput(JoinType join_type) const { + if (join_type == JoinType::RIGHT_SEMI || join_type == JoinType::RIGHT_ANTI) { + return {}; + } + std::vector<FieldRef> output(left_output.size()); + std::transform(left_output.begin(), left_output.end(), output.begin(), + [](int i) { return i; }); + return output; + } - Declaration left{ - "source", - SourceNodeOptions{input_left.schema, input_left.gen(parallel, /*slow=*/false)}}; - Declaration right{"source", - SourceNodeOptions{input_right.schema, - input_right.gen(parallel, /*slow=*/false)}}; + std::vector<FieldRef> RightOutput(JoinType join_type) const { + if (join_type == JoinType::LEFT_SEMI || join_type == JoinType::LEFT_ANTI) { + return {}; + } + std::vector<FieldRef> output(right_output.size()); + std::transform(right_output.begin(), right_output.end(), output.begin(), + [](int i) { return i; }); + return output; + } - HashJoinNodeOptions join_opts{ - JoinType::INNER, - /*left_keys=*/{"l_str"}, - /*right_keys=*/{"r_str"}, filters[test_id], "l_", "r_"}; + ExecBatch Project(JoinType join_type, const ExecBatch& batch) const { + std::vector<Datum> values; + if (join_type != JoinType::RIGHT_SEMI && join_type != JoinType::RIGHT_ANTI) { + for (int i : left_output) { + values.push_back(batch[i]); + } + } + if (join_type != JoinType::LEFT_SEMI && join_type != JoinType::LEFT_ANTI) { + int left_size = + join_type == JoinType::RIGHT_SEMI || join_type == JoinType::RIGHT_ANTI + ? 0 + : left->num_fields(); + for (int i : right_output) { + values.push_back(batch[left_size + i]); + } + } + return {std::move(values), batch.length}; + } + }; - Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts}; + Projector GetProjector(std::vector<int> left_output, std::vector<int> right_output) { + return Projector{left, right, std::move(left_output), std::move(right_output)}; + } + }; + + BatchesWithSchema left; + left.batches = {ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])")}; + left.schema = schema( + {field("l_key", utf8()), field("l_filter", int32()), field("l_payload", utf8())}); + + BatchesWithSchema right; + right.batches = {ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])")}; Review Comment: Minor nit: If you made the payload on the right side something like `r_payload` it might help clarify some of the test cases (e.g. left_semi and right_semi look identical right now even though they aren't). ########## cpp/src/arrow/acero/hash_join_node_test.cc: ########## @@ -1959,47 +2047,993 @@ TEST(HashJoin, TrivialResidualFilter) { std::vector<std::string> expected_strings = {expected_true, expected_false}; std::vector<Expression> filters = {always_true, always_false}; + BatchesWithSchema input_left; + input_left.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ + [1, "alpha"]])")}; + input_left.schema = schema({field("l1", int32()), field("l_str", utf8())}); + + BatchesWithSchema input_right; + input_right.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ + [1, "alpha"]])")}; + input_right.schema = schema({field("r1", int32()), field("r_str", utf8())}); + + ResidualFilterCaseRunner runner{std::move(input_left), std::move(input_right)}; + for (size_t test_id = 0; test_id < 2; test_id++) { - for (bool parallel : {false, true}) { - SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + runner.Run(JoinType::INNER, {"l_str"}, {"r_str"}, filters[test_id], + {ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, + expected_strings[test_id])}); + } +} - BatchesWithSchema input_left; - input_left.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ - [1, "alpha"] - ])")}; - input_left.schema = schema({field("l1", int32()), field("l_str", utf8())}); +TEST(HashJoin, FineGrainedResidualFilter) { + struct JoinSchema { + std::shared_ptr<Schema> left, right; - BatchesWithSchema input_right; - input_right.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ - [1, "alpha"] - ])")}; - input_right.schema = schema({field("r1", int32()), field("r_str", utf8())}); + struct Projector { + std::shared_ptr<Schema> left, right; + std::vector<int> left_output, right_output; - auto exec_ctx = std::make_unique<ExecContext>( - default_memory_pool(), - parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::vector<FieldRef> LeftOutput(JoinType join_type) const { + if (join_type == JoinType::RIGHT_SEMI || join_type == JoinType::RIGHT_ANTI) { + return {}; + } + std::vector<FieldRef> output(left_output.size()); + std::transform(left_output.begin(), left_output.end(), output.begin(), + [](int i) { return i; }); + return output; + } - Declaration left{ - "source", - SourceNodeOptions{input_left.schema, input_left.gen(parallel, /*slow=*/false)}}; - Declaration right{"source", - SourceNodeOptions{input_right.schema, - input_right.gen(parallel, /*slow=*/false)}}; + std::vector<FieldRef> RightOutput(JoinType join_type) const { + if (join_type == JoinType::LEFT_SEMI || join_type == JoinType::LEFT_ANTI) { + return {}; + } + std::vector<FieldRef> output(right_output.size()); + std::transform(right_output.begin(), right_output.end(), output.begin(), + [](int i) { return i; }); + return output; + } - HashJoinNodeOptions join_opts{ - JoinType::INNER, - /*left_keys=*/{"l_str"}, - /*right_keys=*/{"r_str"}, filters[test_id], "l_", "r_"}; + ExecBatch Project(JoinType join_type, const ExecBatch& batch) const { + std::vector<Datum> values; + if (join_type != JoinType::RIGHT_SEMI && join_type != JoinType::RIGHT_ANTI) { + for (int i : left_output) { + values.push_back(batch[i]); + } + } + if (join_type != JoinType::LEFT_SEMI && join_type != JoinType::LEFT_ANTI) { + int left_size = + join_type == JoinType::RIGHT_SEMI || join_type == JoinType::RIGHT_ANTI + ? 0 + : left->num_fields(); + for (int i : right_output) { + values.push_back(batch[left_size + i]); + } + } + return {std::move(values), batch.length}; + } + }; - Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts}; + Projector GetProjector(std::vector<int> left_output, std::vector<int> right_output) { + return Projector{left, right, std::move(left_output), std::move(right_output)}; + } + }; + + BatchesWithSchema left; + left.batches = {ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])")}; + left.schema = schema( + {field("l_key", utf8()), field("l_filter", int32()), field("l_payload", utf8())}); + + BatchesWithSchema right; + right.batches = {ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])")}; + right.schema = schema( + {field("r_key", utf8()), field("r_filter", int32()), field("r_payload", utf8())}); + + JoinSchema join_schema{left.schema, right.schema}; + std::vector<JoinSchema::Projector> projectors{ + join_schema.GetProjector({0, 1, 2}, {0, 1, 2}), // Output all. + join_schema.GetProjector({0}, {0}), // Output key columns only. + join_schema.GetProjector({1}, {1}), // Output filter columns only. + join_schema.GetProjector({2}, {2})}; // Output payload columns only. + + const ResidualFilterCaseRunner runner{std::move(left), std::move(right)}; - ASSERT_OK_AND_ASSIGN(auto result, - DeclarationToExecBatches(std::move(join), parallel)); + { + // Literal true and scalar true. + for (Expression filter : {literal(true), equal(literal(1), literal(1))}) { + std::vector<FieldRef> left_keys{"l_key", "l_filter"}, + right_keys{"r_key", "r_filter"}; + { + // Inner join. + JoinType join_type = JoinType::INNER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } - std::vector<ExecBatch> expected = {ExecBatchFromJSON( - {int32(), utf8(), int32(), utf8()}, expected_strings[test_id])}; + { + // Left outer join. + JoinType join_type = JoinType::LEFT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } - AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, expected); + { + // Right outer join. + JoinType join_type = JoinType::RIGHT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Full outer join. + JoinType join_type = JoinType::FULL_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left semi join. + JoinType join_type = JoinType::LEFT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left anti join. + JoinType join_type = JoinType::LEFT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"], + ["both1", null, "payload"], + ["both2", null, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right semi join. + JoinType join_type = JoinType::RIGHT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right anti join. + JoinType join_type = JoinType::RIGHT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["both1", null, "payload"], + ["both2", null, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + } + } + + { + // Literal false, null, and scalar false, null. + for (Expression filter : + {literal(false), literal(NullScalar()), equal(literal(0), literal(1)), + equal(literal(1), literal(NullScalar()))}) { + std::vector<FieldRef> left_keys{"l_key", "l_filter"}, + right_keys{"r_key", "r_filter"}; + { + // Inner join. + JoinType join_type = JoinType::INNER; + auto expected = ExecBatchFromJSON( + {utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left outer join. + JoinType join_type = JoinType::LEFT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both1", 0, "payload", null, null, null], + ["both1", 42, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both2", 0, "payload", null, null, null], + ["both2", 42, "payload", null, null, null]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right outer join. + JoinType join_type = JoinType::RIGHT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both1", 0, "payload"], + [null, null, null, "both1", 42, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "both2", 0, "payload"], + [null, null, null, "both2", 42, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Full outer join. + JoinType join_type = JoinType::FULL_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both1", 0, "payload", null, null, null], + ["both1", 42, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both2", 0, "payload", null, null, null], + ["both2", 42, "payload", null, null, null], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both1", 0, "payload"], + [null, null, null, "both1", 42, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "both2", 0, "payload"], + [null, null, null, "both2", 42, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left semi join. + JoinType join_type = JoinType::LEFT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left anti join. + JoinType join_type = JoinType::LEFT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right semi join. + JoinType join_type = JoinType::RIGHT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right anti join. + JoinType join_type = JoinType::RIGHT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + } + } + + { + // Non-trivial filters referring left columns only. + for (Expression filter : {equal(field_ref("l_filter"), literal(42)), + not_equal(literal(0), field_ref("l_filter"))}) { + std::vector<FieldRef> left_keys{"l_key"}, right_keys{"r_key"}; + { + // Inner join. + JoinType join_type = JoinType::INNER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", 42, "payload", "both1", null, "payload"], + ["both1", 42, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 42, "payload", "both2", null, "payload"], + ["both2", 42, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left outer join. + JoinType join_type = JoinType::LEFT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both1", 0, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both2", 0, "payload", null, null, null], + ["both1", 42, "payload", "both1", null, "payload"], + ["both1", 42, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 42, "payload", "both2", null, "payload"], + ["both2", 42, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right outer join. + JoinType join_type = JoinType::RIGHT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", 42, "payload", "both1", null, "payload"], + ["both1", 42, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 42, "payload", "both2", null, "payload"], + ["both2", 42, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Full outer join. + JoinType join_type = JoinType::FULL_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both1", 0, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both2", 0, "payload", null, null, null], + ["both1", 42, "payload", "both1", null, "payload"], + ["both1", 42, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 42, "payload", "both2", null, "payload"], + ["both2", 42, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left semi join. + JoinType join_type = JoinType::LEFT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", 42, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left anti join. + JoinType join_type = JoinType::LEFT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right semi join. + JoinType join_type = JoinType::RIGHT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right anti join. + JoinType join_type = JoinType::RIGHT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + } + } + + { + // Non-trivial filters referring right columns only. + for (Expression filter : {equal(field_ref("r_filter"), literal(42)), + not_equal(literal(0), field_ref("r_filter"))}) { + std::vector<FieldRef> left_keys{"l_key"}, right_keys{"r_key"}; + { + // Inner join. + JoinType join_type = JoinType::INNER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", null, "payload", "both1", 42, "payload"], + ["both1", 0, "payload", "both1", 42, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", null, "payload", "both2", 42, "payload"], + ["both2", 0, "payload", "both2", 42, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left outer join. + JoinType join_type = JoinType::LEFT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", "both1", 42, "payload"], + ["both1", 0, "payload", "both1", 42, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", null, "payload", "both2", 42, "payload"], + ["both2", 0, "payload", "both2", 42, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right outer join. + JoinType join_type = JoinType::RIGHT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", null, "payload", "both1", 42, "payload"], + ["both1", 0, "payload", "both1", 42, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", null, "payload", "both2", 42, "payload"], + ["both2", 0, "payload", "both2", 42, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both1", 0, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "both2", 0, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Full outer join. + JoinType join_type = JoinType::FULL_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", "both1", 42, "payload"], + ["both1", 0, "payload", "both1", 42, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", null, "payload", "both2", 42, "payload"], + ["both2", 0, "payload", "both2", 42, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both1", 0, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "both2", 0, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left semi join. + JoinType join_type = JoinType::LEFT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left anti join. + JoinType join_type = JoinType::LEFT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right semi join. + JoinType join_type = JoinType::RIGHT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", 42, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right anti join. + JoinType join_type = JoinType::RIGHT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["both1", null, "payload"], + ["both1", 0, "payload"], + ["both2", null, "payload"], + ["both2", 0, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + } + } + + { + // Non-trivial filters referring both left and right columns. + for (Expression filter : + {equal(field_ref("l_filter"), field_ref("r_filter")), + equal(call("subtract", {field_ref("l_filter"), field_ref("r_filter")}), + literal(0))}) { + std::vector<FieldRef> left_keys{"l_key"}, right_keys{"r_key"}; + { + // Inner join. + JoinType join_type = JoinType::INNER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left outer join. + JoinType join_type = JoinType::LEFT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right outer join. + JoinType join_type = JoinType::RIGHT_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Full outer join. + JoinType join_type = JoinType::FULL_OUTER; + auto expected = + ExecBatchFromJSON({utf8(), int32(), utf8(), utf8(), int32(), utf8()}, R"([ + [null, null, "payload", null, null, null], + [null, 0, "payload", null, null, null], + [null, 42, "payload", null, null, null], + ["left_only", null, "payload", null, null, null], + ["left_only", 0, "payload", null, null, null], + ["left_only", 42, "payload", null, null, null], + ["both1", null, "payload", null, null, null], + ["both2", null, "payload", null, null, null], + ["both1", 0, "payload", "both1", 0, "payload"], + ["both1", 42, "payload", "both1", 42, "payload"], + ["both2", 0, "payload", "both2", 0, "payload"], + ["both2", 42, "payload", "both2", 42, "payload"], + [null, null, null, null, null, "payload"], + [null, null, null, null, 0, "payload"], + [null, null, null, null, 42, "payload"], + [null, null, null, "both1", null, "payload"], + [null, null, null, "both2", null, "payload"], + [null, null, null, "right_only", null, "payload"], + [null, null, null, "right_only", 0, "payload"], + [null, null, null, "right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left semi join. + JoinType join_type = JoinType::LEFT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Left anti join. + JoinType join_type = JoinType::LEFT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["left_only", null, "payload"], + ["left_only", 0, "payload"], + ["left_only", 42, "payload"], + ["both1", null, "payload"], + ["both2", null, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right semi join. + JoinType join_type = JoinType::RIGHT_SEMI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + ["both1", 0, "payload"], + ["both1", 42, "payload"], + ["both2", 0, "payload"], + ["both2", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } + + { + // Right anti join. + JoinType join_type = JoinType::RIGHT_ANTI; + auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ + [null, null, "payload"], + [null, 0, "payload"], + [null, 42, "payload"], + ["both1", null, "payload"], + ["both2", null, "payload"], + ["right_only", null, "payload"], + ["right_only", 0, "payload"], + ["right_only", 42, "payload"]])"); + for (const auto& projector : projectors) { + runner.Run(join_type, left_keys, right_keys, projector.LeftOutput(join_type), + projector.RightOutput(join_type), filter, + {projector.Project(join_type, expected)}); + } + } Review Comment: I appreciate the extensive set of readable tests, thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org