github-actions[bot] commented on code in PR #31526:
URL: https://github.com/apache/doris/pull/31526#discussion_r1505531262


##########
be/src/vec/aggregate_functions/aggregate_function_foreach.h:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/Combinators/AggregateFunctionForEach.h
+// and modified by Doris
+
+#pragma once
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/array/function_array_utils.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+struct AggregateFunctionForEachData {
+    size_t dynamic_array_size = 0;
+    char* array_of_aggregate_datas = nullptr;
+};
+
+/** Adaptor for aggregate functions.
+  * Adding -ForEach suffix to aggregate function
+  *  will convert that aggregate function to a function, accepting arrays,
+  *  and applies aggregation for each corresponding elements of arrays 
independently,
+  *  returning arrays of aggregated values on corresponding positions.
+  *
+  * Example: sumForEach of:
+  *  [1, 2],
+  *  [3, 4, 5],
+  *  [6, 7]
+  * will return:
+  *  [10, 13, 5]
+  *
+  * TODO Allow variable number of arguments.
+  */
+class AggregateFunctionForEach : public 
IAggregateFunctionDataHelper<AggregateFunctionForEachData,
+                                                                     
AggregateFunctionForEach> {
+protected:
+    using Base =
+            IAggregateFunctionDataHelper<AggregateFunctionForEachData, 
AggregateFunctionForEach>;
+
+    AggregateFunctionPtr nested_function;
+    const size_t nested_size_of_data;
+    const size_t num_arguments;
+
+    AggregateFunctionForEachData& ensureAggregateData(AggregateDataPtr 
__restrict place,
+                                                      size_t new_size, Arena& 
arena) const {
+        AggregateFunctionForEachData& state = data(place);
+
+        /// Ensure we have aggregate states for new_size elements, allocate
+        /// from arena if needed. When reallocating, we can't copy the
+        /// states to new buffer with memcpy, because they may contain pointers
+        /// to themselves. In particular, this happens when a state contains
+        /// a PODArrayWithStackMemory, which stores small number of elements
+        /// inline. This is why we create new empty states in the new buffer,
+        /// and merge the old states to them.
+        size_t old_size = state.dynamic_array_size;
+        if (old_size < new_size) {
+            static constexpr size_t MAX_ARRAY_SIZE = 100 * 1000000000ULL;
+            if (new_size > MAX_ARRAY_SIZE) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Suspiciously large array size ({}) in 
-ForEach aggregate function",
+                                new_size);
+            }
+
+            size_t allocation_size = 0;
+            if (common::mul_overflow(new_size, nested_size_of_data, 
allocation_size)) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Allocation size ({} * {}) overflows in 
-ForEach aggregate "
+                                "function, but it should've been prevented by 
previous checks",
+                                new_size, nested_size_of_data);
+            }
+
+            char* old_state = state.array_of_aggregate_datas;
+
+            char* new_state =
+                    arena.aligned_alloc(allocation_size, 
nested_function->align_of_data());
+
+            size_t i;
+            try {
+                for (i = 0; i < new_size; ++i) {
+                    nested_function->create(&new_state[i * 
nested_size_of_data]);
+                }
+            } catch (...) {
+                size_t cleanup_size = i;
+
+                for (i = 0; i < cleanup_size; ++i) {
+                    nested_function->destroy(&new_state[i * 
nested_size_of_data]);
+                }
+
+                throw;
+            }
+
+            for (i = 0; i < old_size; ++i) {
+                nested_function->merge(&new_state[i * nested_size_of_data],
+                                       &old_state[i * nested_size_of_data], 
&arena);
+            }
+
+            state.array_of_aggregate_datas = new_state;
+            state.dynamic_array_size = new_size;
+        }
+
+        return state;
+    }
+
+public:
+    constexpr static auto AGG_FOREACH_SUFFIX = "_foreach";
+    AggregateFunctionForEach(AggregateFunctionPtr nested_function_, const 
DataTypes& arguments)
+            : Base(arguments),
+              nested_function {std::move(nested_function_)},
+              nested_size_of_data(nested_function->size_of_data()),
+              num_arguments(arguments.size()) {
+        if (arguments.empty()) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Aggregate function {} require at least one 
argument", get_name());
+        }
+    }
+    void set_version(const int version_) override {

Review Comment:
   warning: method 'set_version' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_version(const int version_) override {
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_foreach.h:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/Combinators/AggregateFunctionForEach.h
+// and modified by Doris
+
+#pragma once
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/array/function_array_utils.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+struct AggregateFunctionForEachData {
+    size_t dynamic_array_size = 0;
+    char* array_of_aggregate_datas = nullptr;
+};
+
+/** Adaptor for aggregate functions.
+  * Adding -ForEach suffix to aggregate function
+  *  will convert that aggregate function to a function, accepting arrays,
+  *  and applies aggregation for each corresponding elements of arrays 
independently,
+  *  returning arrays of aggregated values on corresponding positions.
+  *
+  * Example: sumForEach of:
+  *  [1, 2],
+  *  [3, 4, 5],
+  *  [6, 7]
+  * will return:
+  *  [10, 13, 5]
+  *
+  * TODO Allow variable number of arguments.
+  */
+class AggregateFunctionForEach : public 
IAggregateFunctionDataHelper<AggregateFunctionForEachData,
+                                                                     
AggregateFunctionForEach> {
+protected:
+    using Base =
+            IAggregateFunctionDataHelper<AggregateFunctionForEachData, 
AggregateFunctionForEach>;
+
+    AggregateFunctionPtr nested_function;
+    const size_t nested_size_of_data;
+    const size_t num_arguments;
+
+    AggregateFunctionForEachData& ensureAggregateData(AggregateDataPtr 
__restrict place,
+                                                      size_t new_size, Arena& 
arena) const {
+        AggregateFunctionForEachData& state = data(place);
+
+        /// Ensure we have aggregate states for new_size elements, allocate
+        /// from arena if needed. When reallocating, we can't copy the
+        /// states to new buffer with memcpy, because they may contain pointers
+        /// to themselves. In particular, this happens when a state contains
+        /// a PODArrayWithStackMemory, which stores small number of elements
+        /// inline. This is why we create new empty states in the new buffer,
+        /// and merge the old states to them.
+        size_t old_size = state.dynamic_array_size;
+        if (old_size < new_size) {
+            static constexpr size_t MAX_ARRAY_SIZE = 100 * 1000000000ULL;
+            if (new_size > MAX_ARRAY_SIZE) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Suspiciously large array size ({}) in 
-ForEach aggregate function",
+                                new_size);
+            }
+
+            size_t allocation_size = 0;
+            if (common::mul_overflow(new_size, nested_size_of_data, 
allocation_size)) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Allocation size ({} * {}) overflows in 
-ForEach aggregate "
+                                "function, but it should've been prevented by 
previous checks",
+                                new_size, nested_size_of_data);
+            }
+
+            char* old_state = state.array_of_aggregate_datas;
+
+            char* new_state =
+                    arena.aligned_alloc(allocation_size, 
nested_function->align_of_data());
+
+            size_t i;
+            try {
+                for (i = 0; i < new_size; ++i) {
+                    nested_function->create(&new_state[i * 
nested_size_of_data]);
+                }
+            } catch (...) {
+                size_t cleanup_size = i;
+
+                for (i = 0; i < cleanup_size; ++i) {
+                    nested_function->destroy(&new_state[i * 
nested_size_of_data]);
+                }
+
+                throw;
+            }
+
+            for (i = 0; i < old_size; ++i) {
+                nested_function->merge(&new_state[i * nested_size_of_data],
+                                       &old_state[i * nested_size_of_data], 
&arena);
+            }
+
+            state.array_of_aggregate_datas = new_state;
+            state.dynamic_array_size = new_size;
+        }
+
+        return state;
+    }
+
+public:
+    constexpr static auto AGG_FOREACH_SUFFIX = "_foreach";
+    AggregateFunctionForEach(AggregateFunctionPtr nested_function_, const 
DataTypes& arguments)
+            : Base(arguments),
+              nested_function {std::move(nested_function_)},
+              nested_size_of_data(nested_function->size_of_data()),
+              num_arguments(arguments.size()) {
+        if (arguments.empty()) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Aggregate function {} require at least one 
argument", get_name());
+        }
+    }
+    void set_version(const int version_) override {
+        Base::set_version(version_);
+        nested_function->set_version(version_);
+    }
+
+    String get_name() const override { return nested_function->get_name() + 
AGG_FOREACH_SUFFIX; }
+
+    DataTypePtr get_return_type() const override {
+        // For functions like "count," the return value may not actually be 
nullable.
+        // Here we handle everything as nullable for the sake of convenience.
+        return 
std::make_shared<DataTypeArray>(make_nullable(nested_function->get_return_type()));
+    }
+
+    void destroy(AggregateDataPtr __restrict place) const noexcept override {

Review Comment:
   warning: pointer parameter 'place' can be pointer to const 
[readability-non-const-parameter]
   
   ```suggestion
       void destroy(const AggregateDataPtr __restrict place) const noexcept 
override {
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_foreach.h:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/Combinators/AggregateFunctionForEach.h
+// and modified by Doris
+
+#pragma once
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/array/function_array_utils.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+struct AggregateFunctionForEachData {
+    size_t dynamic_array_size = 0;
+    char* array_of_aggregate_datas = nullptr;
+};
+
+/** Adaptor for aggregate functions.
+  * Adding -ForEach suffix to aggregate function
+  *  will convert that aggregate function to a function, accepting arrays,
+  *  and applies aggregation for each corresponding elements of arrays 
independently,
+  *  returning arrays of aggregated values on corresponding positions.
+  *
+  * Example: sumForEach of:
+  *  [1, 2],
+  *  [3, 4, 5],
+  *  [6, 7]
+  * will return:
+  *  [10, 13, 5]
+  *
+  * TODO Allow variable number of arguments.
+  */
+class AggregateFunctionForEach : public 
IAggregateFunctionDataHelper<AggregateFunctionForEachData,
+                                                                     
AggregateFunctionForEach> {
+protected:
+    using Base =
+            IAggregateFunctionDataHelper<AggregateFunctionForEachData, 
AggregateFunctionForEach>;
+
+    AggregateFunctionPtr nested_function;
+    const size_t nested_size_of_data;
+    const size_t num_arguments;
+
+    AggregateFunctionForEachData& ensureAggregateData(AggregateDataPtr 
__restrict place,

Review Comment:
   warning: pointer parameter 'place' can be pointer to const 
[readability-non-const-parameter]
   
   ```suggestion
       AggregateFunctionForEachData& ensureAggregateData(const AggregateDataPtr 
__restrict place,
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_simple_factory.h:
##########
@@ -78,6 +86,21 @@ class AggregateFunctionSimpleFactory {
         }
     }
 
+    void register_foreach_function_combinator(const Creator& creator, const 
std::string& suffix,

Review Comment:
   warning: method 'register_foreach_function_combinator' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void register_foreach_function_combinator(const Creator& creator, 
const std::string& suffix,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to