westonpace commented on code in PR #14186: URL: https://github.com/apache/arrow/pull/14186#discussion_r978980117
########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -541,6 +541,52 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& std::move(aggregate_schema)); } + case substrait::Rel::RelTypeCase::kSet: { + const auto& set = rel.set(); + RETURN_NOT_OK(CheckRelCommon(set)); + + if (set.inputs_size() < 2) { + return Status::Invalid( + "substrait::SetRel with inadequate number of input relations, ", + set.inputs_size()); + } + substrait::SetRel_SetOp op = set.op(); + // Note: at the moment Acero only supports UNION_ALL operation + switch (op) { + case substrait::SetRel::SET_OP_UNSPECIFIED: + return Status::NotImplemented("NotImplemented union type"); Review Comment: Maybe you can use [EnumToString](https://github.com/apache/arrow/blob/356e7f836c145966ebbeb65c3b65d82348e4234e/cpp/src/arrow/engine/substrait/expression_internal.cc#L81) to have a better error message. ########## cpp/src/arrow/engine/substrait/serde_test.cc: ########## @@ -3082,5 +3082,125 @@ TEST(Substrait, AggregateRelEmit) { buf, {}, conversion_options); } +TEST(Substrait, SetRelationBasic) { + compute::ExecContext exec_context; + auto dummy_schema = + schema({field("A", int32()), field("B", int32()), field("C", int32())}); + + // creating a dummy dataset using a dummy table + auto table1 = TableFromJSON(dummy_schema, {R"([ + [10, 1, 80], + [20, 2, 70], + [30, 3, 30], + [40, 4, 20], + [40, 5, 40], + [20, 6, 20], + [30, 7, 30] + ])"}); + + auto table2 = TableFromJSON(dummy_schema, {R"([ + [11, 1, 82], + [21, 2, 72], + [31, 3, 32], + [41, 4, 22], + [41, 5, 42], + [21, 6, 22], + [31, 7, 32] + ])"}); + + NamedTableProvider table_provider = [table1, + table2](const std::vector<std::string>& names) { + std::shared_ptr<Table> output_table; + for (const auto& name : names) { + if (name == "T1") { + output_table = table1; + } + if (name == "T2") { + output_table = table2; + } + } + std::shared_ptr<compute::ExecNodeOptions> options = + std::make_shared<compute::TableSourceNodeOptions>(std::move(output_table)); + return compute::Declaration("table_source", {}, options, "mock_source"); + }; + + ConversionOptions conversion_options; + conversion_options.named_table_provider = std::move(table_provider); + + std::string substrait_json = R"({ + "relations": [{ + "root": { + "input": { + "set": { + "inputs": [{ + "read": { + "baseSchema": { + "names": ["FOO"], + "struct": { + "types": [{ + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["T1"] + } + } + }, { + "read": { + "baseSchema": { + "names": ["BAR"], + "struct": { + "types": [{ + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["T2"] + } + } + }], + "op": "SET_OP_UNION_ALL" + } + }, + "names": ["FOO"] + } + }] + })"; + + ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", substrait_json)); + + auto expected_table = TableFromJSON(dummy_schema, {R"([ + [11, 1, 82], + [21, 2, 72], + [31, 3, 32], + [41, 4, 22], + [41, 5, 42], + [21, 6, 22], + [31, 7, 32], + [10, 1, 80], + [20, 2, 70], + [30, 3, 30], + [40, 4, 20], + [40, 5, 40], + [20, 6, 20], + [30, 7, 30] + ])"}); + + CheckRoundTripResult(dummy_schema, std::move(expected_table), exec_context, buf, {}, Review Comment: I don't think the union node makes any guarantee on ordering so you might need to use the unordered round trip result that you added to the tests for glob. ########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -541,6 +541,52 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& std::move(aggregate_schema)); } + case substrait::Rel::RelTypeCase::kSet: { + const auto& set = rel.set(); + RETURN_NOT_OK(CheckRelCommon(set)); + + if (set.inputs_size() < 2) { + return Status::Invalid( + "substrait::SetRel with inadequate number of input relations, ", + set.inputs_size()); + } + substrait::SetRel_SetOp op = set.op(); + // Note: at the moment Acero only supports UNION_ALL operation + switch (op) { + case substrait::SetRel::SET_OP_UNSPECIFIED: + return Status::NotImplemented("NotImplemented union type"); + case substrait::SetRel::SET_OP_MINUS_PRIMARY: + return Status::NotImplemented("NotImplemented union type"); + case substrait::SetRel::SET_OP_MINUS_MULTISET: + return Status::NotImplemented("NotImplemented union type"); + case substrait::SetRel::SET_OP_INTERSECTION_PRIMARY: + return Status::NotImplemented("NotImplemented union type"); + case substrait::SetRel::SET_OP_INTERSECTION_MULTISET: + return Status::NotImplemented("NotImplemented union type"); + case substrait::SetRel::SET_OP_UNION_DISTINCT: + return Status::NotImplemented("NotImplemented union type"); + case substrait::SetRel::SET_OP_UNION_ALL: + break; + default: + return Status::Invalid("Unsupported union type"); + } + int input_size = set.inputs_size(); + compute::Declaration union_declr{"union", compute::ExecNodeOptions{}}; + std::shared_ptr<Schema> union_schema; + for (int input_id = 0; input_id < input_size; input_id++) { + ARROW_ASSIGN_OR_RAISE( + auto input, FromProto(set.inputs(input_id), ext_set, conversion_options)); + union_declr.inputs.emplace_back(std::move(input.declaration)); + if (union_schema == nullptr) { + union_schema = input.output_schema; Review Comment: This is fine. My first thought was that we could validate that the output schema of all the inputs is identical but the union node itself will do this when it is eventually converted into an exec plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org