westonpace commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914295005
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
Review Comment:
Use `ARROW_ASSIGN_OR_RAISE` here instead of using `->` later. Otherwise, if
something goes wrong converting the expression, it will result in an abort
(which we don't want) instead of an invalid status (which we would want).
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
Review Comment:
```suggestion
const auto* field_ref = expr->field_ref();
```
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
Review Comment:
```suggestion
return Status::NotImplemented("Grouping sets not supported.");
```
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
+ }
+ }
+ // denotes how many unique aggregation functions are used
+ // measure_id refers to the corresponding function in the
+ // extensionsion
+ int measure_size = aggregate.measures_size();
+ std::vector<compute::Aggregate> aggregates;
+ aggregates.reserve(measure_size);
+ for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+ const auto& agg_measure = aggregate.measures(measure_id);
+ if (agg_measure.has_measure()) {
+ if (agg_measure.has_filter()) {
+ return Status::Invalid("Aggregate filters are not supported.");
Review Comment:
```suggestion
return Status::NotImplemented("Aggregate filters are not
supported.");
```
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
+ }
+ }
+ // denotes how many unique aggregation functions are used
+ // measure_id refers to the corresponding function in the
+ // extensionsion
+ int measure_size = aggregate.measures_size();
+ std::vector<compute::Aggregate> aggregates;
+ aggregates.reserve(measure_size);
+ for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+ const auto& agg_measure = aggregate.measures(measure_id);
+ if (agg_measure.has_measure()) {
+ if (agg_measure.has_filter()) {
+ return Status::Invalid("Aggregate filters are not supported.");
+ }
+ const auto& agg_func = agg_measure.measure();
+ if (agg_func.args_size() != 1) {
+ return Status::Invalid("Aggregate function must be a unary
function.");
+ }
+ int func_reference = agg_func.function_reference();
+ ARROW_ASSIGN_OR_RAISE(auto func_record,
ext_set.DecodeFunction(func_reference));
+ // aggreagte function name
+ auto func_name = std::string(func_record.id.name);
+ // aggregate output column name
+ std::string agg_col_name =
+ func_name + "(" + std::to_string(func_reference) + ")";
Review Comment:
I'm not sure there is any advantage to giving these columns a name vs just
leaving them as empty strings
##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -443,8 +443,9 @@ struct DefaultExtensionIdRegistry : ExtensionIdRegistryImpl
{
// ARROW-15535.
for (util::string_view name : {
"add",
- "equal",
- "is_not_distinct_from",
+ "equal", // added to support join operator
Review Comment:
The comments are probably not needed as this will change significantly in
the function mapping PR but they don't hurt.
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
+ }
+ }
+ // denotes how many unique aggregation functions are used
+ // measure_id refers to the corresponding function in the
+ // extensionsion
Review Comment:
```suggestion
```
I'm not sure this comment is helpful. Maybe something simple like "A
substrait measure is equivalent to an Acero aggregate" but even that seems
excessive
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
Review Comment:
```suggestion
"The grouping expression for an aggregate must be a direct
reference.");
```
Minor nit: wording
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
+ }
+ }
+ // denotes how many unique aggregation functions are used
+ // measure_id refers to the corresponding function in the
+ // extensionsion
+ int measure_size = aggregate.measures_size();
+ std::vector<compute::Aggregate> aggregates;
+ aggregates.reserve(measure_size);
+ for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+ const auto& agg_measure = aggregate.measures(measure_id);
+ if (agg_measure.has_measure()) {
+ if (agg_measure.has_filter()) {
+ return Status::Invalid("Aggregate filters are not supported.");
+ }
+ const auto& agg_func = agg_measure.measure();
+ if (agg_func.args_size() != 1) {
+ return Status::Invalid("Aggregate function must be a unary
function.");
Review Comment:
```suggestion
return Status::NotImplemented("Aggregate function must be a
unary function.");
```
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
+ }
+ }
+ // denotes how many unique aggregation functions are used
+ // measure_id refers to the corresponding function in the
+ // extensionsion
+ int measure_size = aggregate.measures_size();
+ std::vector<compute::Aggregate> aggregates;
+ aggregates.reserve(measure_size);
+ for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+ const auto& agg_measure = aggregate.measures(measure_id);
+ if (agg_measure.has_measure()) {
+ if (agg_measure.has_filter()) {
+ return Status::Invalid("Aggregate filters are not supported.");
+ }
+ const auto& agg_func = agg_measure.measure();
+ if (agg_func.args_size() != 1) {
+ return Status::Invalid("Aggregate function must be a unary
function.");
+ }
+ int func_reference = agg_func.function_reference();
+ ARROW_ASSIGN_OR_RAISE(auto func_record,
ext_set.DecodeFunction(func_reference));
+ // aggreagte function name
+ auto func_name = std::string(func_record.id.name);
+ // aggregate output column name
+ std::string agg_col_name =
+ func_name + "(" + std::to_string(func_reference) + ")";
+ // aggregate target
+ ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0),
ext_set));
+ auto target = field_expr.field_ref();
+ if (!target) {
+ return Status::Invalid(
+ "Only accept a direct reference as the aggregate expression.");
+ }
+ // TODO: Implement function options in Substrait
Review Comment:
We shouldn't have a TODO without some kind of JIRA reference. I'm not sure
we need a TODO here though. Maybe just get rid of this line and keep the next
line about setting function options to nullptr.
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const
substrait::Rel& rel,
join_dec.inputs.emplace_back(std::move(right));
return std::move(join_dec);
}
+ case substrait::Rel::RelTypeCase::kAggregate: {
+ const auto& aggregate = rel.aggregate();
+ RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+ if (!aggregate.has_input()) {
+ return Status::Invalid("substrait::AggregateRel with no input
relation");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+ if (aggregate.groupings_size() > 1) {
+ return Status::Invalid("Grouping sets not supported.");
+ }
+ std::vector<FieldRef> keys;
+ auto group = aggregate.groupings(0);
+ keys.reserve(group.grouping_expressions_size());
+ for (int exp_id = 0; exp_id < group.grouping_expressions_size();
exp_id++) {
+ const auto& expr = FromProto(group.grouping_expressions(exp_id),
ext_set);
+ const auto& field_ref = expr->field_ref();
+ if (field_ref) {
+ keys.emplace_back(std::move(*field_ref));
+ } else {
+ return Status::Invalid(
+ "Only accept a direct reference as the grouping expression for
aggregates");
+ }
+ }
+ // denotes how many unique aggregation functions are used
+ // measure_id refers to the corresponding function in the
+ // extensionsion
+ int measure_size = aggregate.measures_size();
+ std::vector<compute::Aggregate> aggregates;
+ aggregates.reserve(measure_size);
+ for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+ const auto& agg_measure = aggregate.measures(measure_id);
+ if (agg_measure.has_measure()) {
+ if (agg_measure.has_filter()) {
+ return Status::Invalid("Aggregate filters are not supported.");
+ }
+ const auto& agg_func = agg_measure.measure();
+ if (agg_func.args_size() != 1) {
+ return Status::Invalid("Aggregate function must be a unary
function.");
+ }
+ int func_reference = agg_func.function_reference();
+ ARROW_ASSIGN_OR_RAISE(auto func_record,
ext_set.DecodeFunction(func_reference));
+ // aggreagte function name
+ auto func_name = std::string(func_record.id.name);
+ // aggregate output column name
+ std::string agg_col_name =
+ func_name + "(" + std::to_string(func_reference) + ")";
+ // aggregate target
+ ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0),
ext_set));
+ auto target = field_expr.field_ref();
+ if (!target) {
+ return Status::Invalid(
+ "Only accept a direct reference as the aggregate expression.");
Review Comment:
```suggestion
"The input expression to an aggregate function must be a
direct reference.");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]