rtpsw commented on code in PR #34627:
URL: https://github.com/apache/arrow/pull/34627#discussion_r1151467714


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -730,64 +810,24 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& 
rel, const ExtensionSet&
       std::vector<compute::Aggregate> aggregates;
       aggregates.reserve(measure_size);
       // store aggregate fields to be used when output schema is created
-      std::vector<std::vector<int>> agg_src_fieldsets(measure_size);
+      std::vector<std::vector<int>> agg_src_fieldsets;
+      agg_src_fieldsets.reserve(measure_size);
       for (int measure_id = 0; measure_id < measure_size; measure_id++) {
         const auto& agg_measure = aggregate.measures(measure_id);
-        if (agg_measure.has_measure()) {
-          if (agg_measure.has_filter()) {
-            return Status::NotImplemented("Aggregate filters are not 
supported.");
-          }
-          const auto& agg_func = agg_measure.measure();
-          ARROW_ASSIGN_OR_RAISE(SubstraitCall aggregate_call,
-                                FromProto(agg_func, /*is_hash=*/!keys.empty(), 
ext_set,
-                                          conversion_options));
-          ExtensionIdRegistry::SubstraitAggregateToArrow converter;
-          if (aggregate_call.id().uri.empty() || aggregate_call.id().uri[0] == 
'/') {
-            ARROW_ASSIGN_OR_RAISE(
-                converter, 
ext_set.registry()->GetSubstraitAggregateToArrowFallback(
-                               aggregate_call.id().name));
-          } else {
-            ARROW_ASSIGN_OR_RAISE(
-                converter,
-                
ext_set.registry()->GetSubstraitAggregateToArrow(aggregate_call.id()));
-          }
-          ARROW_ASSIGN_OR_RAISE(compute::Aggregate arrow_agg, 
converter(aggregate_call));
-
-          // find aggregate field ids from schema
-          const auto& target = arrow_agg.target;
-          for (const auto& field_ref : target) {
-            ARROW_ASSIGN_OR_RAISE(auto match, 
field_ref.FindOne(*input_schema));
-            agg_src_fieldsets[measure_id].push_back(match[0]);
-          }
-
-          aggregates.push_back(std::move(arrow_agg));
-        } else {
-          return Status::Invalid("substrait::AggregateFunction not provided");
-        }
-      }
-      FieldVector output_fields;
-      output_fields.reserve(key_field_ids.size() + measure_size);
-      // extract aggregate fields to output schema
-      for (const auto& agg_src_fieldset : agg_src_fieldsets) {
-        for (int field : agg_src_fieldset) {
-          output_fields.emplace_back(input_schema->field(field));
-        }
-      }
-      // extract key fields to output schema
-      for (int key_field_id : key_field_ids) {
-        output_fields.emplace_back(input_schema->field(key_field_id));
+        ARROW_RETURN_NOT_OK(internal::ParseAggregateMeasure(
+            agg_measure, ext_set, conversion_options, 
/*is_hash=*/!keys.empty(),
+            input_schema, &aggregates, &agg_src_fieldsets));
       }
 
-      std::shared_ptr<Schema> aggregate_schema = 
schema(std::move(output_fields));
-
-      DeclarationInfo aggregate_declaration{
-          compute::Declaration::Sequence(
-              {std::move(input.declaration),
-               {"aggregate", compute::AggregateNodeOptions{aggregates, 
keys}}}),
-          aggregate_schema};
+      ARROW_ASSIGN_OR_RAISE(
+          auto aggregate_declaration,
+          internal::MakeAggregateDeclaration(
+              std::move(input.declaration), std::move(input_schema), 
measure_size,
+              std::move(aggregates), std::move(agg_src_fieldsets), 
std::move(keys),
+              std::move(key_field_ids), {}, {}, ext_set, conversion_options));
 
       return ProcessEmit(std::move(aggregate), 
std::move(aggregate_declaration),
-                         std::move(aggregate_schema));
+                         aggregate_declaration.output_schema);

Review Comment:
   I'll fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to