westonpace commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914295005


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);

Review Comment:
   Use `ARROW_ASSIGN_OR_RAISE` here instead of using `->` later.  Otherwise, if 
something goes wrong converting the expression, it will result in an abort 
(which we don't want) instead of an invalid status (which we would want).



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();

Review Comment:
   ```suggestion
           const auto* field_ref = expr->field_ref();
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");

Review Comment:
   ```suggestion
           return Status::NotImplemented("Grouping sets not supported.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");

Review Comment:
   ```suggestion
               return Status::NotImplemented("Aggregate filters are not 
supported.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary 
function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, 
ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";

Review Comment:
   I'm not sure there is any advantage to giving these columns a name vs just 
leaving them as empty strings



##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -443,8 +443,9 @@ struct DefaultExtensionIdRegistry : ExtensionIdRegistryImpl 
{
     // ARROW-15535.
     for (util::string_view name : {
              "add",
-             "equal",
-             "is_not_distinct_from",
+             "equal",                 // added to support join operator

Review Comment:
   The comments are probably not needed as this will change significantly in 
the function mapping PR but they don't hurt.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion

Review Comment:
   ```suggestion
   ```
   I'm not sure this comment is helpful.  Maybe something simple like "A 
substrait measure is equivalent to an Acero aggregate" but even that seems 
excessive



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");

Review Comment:
   ```suggestion
                 "The grouping expression for an aggregate must be a direct 
reference.");
   ```
   Minor nit: wording



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary 
function.");

Review Comment:
   ```suggestion
               return Status::NotImplemented("Aggregate function must be a 
unary function.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary 
function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, 
ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";
+          // aggregate target
+          ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0), 
ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "Only accept a direct reference as the aggregate expression.");
+          }
+          // TODO: Implement function options in Substrait

Review Comment:
   We shouldn't have a TODO without some kind of JIRA reference.  I'm not sure 
we need a TODO here though.  Maybe just get rid of this line and keep the next 
line about setting function options to nullptr.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input 
relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); 
exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), 
ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for 
aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary 
function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, 
ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";
+          // aggregate target
+          ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0), 
ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "Only accept a direct reference as the aggregate expression.");

Review Comment:
   ```suggestion
                   "The input expression to an aggregate function must be a 
direct reference.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to