zhanglistar commented on code in PR #8550: URL: https://github.com/apache/incubator-gluten/pull/8550#discussion_r1948799571
########## cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxCountDistinctFunctionParser.cpp: ########## @@ -0,0 +1,148 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <Parser/AggregateFunctionParser.h> +#include <DataTypes/DataTypeNullable.h> +#include <Poco/Logger.h> +#include <Common/logger_useful.h> +#include "DataTypes/DataTypeAggregateFunction.h" Review Comment: Use `<>` instead of `"` in include clause ########## cpp-ch/local-engine/AggregateFunctions/AggregateFunctionUniqHyperLogLogPlusPlus.h: ########## @@ -0,0 +1,3089 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/AggregateFunctionFactory.h> +#include <Columns/ColumnVector.h> +#include <DataTypes/DataTypesNumber.h> +#include <DataTypes/IDataType.h> +#include <DataTypes/DataTypeAggregateFunction.h> +#include <Common/FieldVisitorConvertToNumber.h> +#include <Common/FieldVisitors.h> +#include <Common/HashTable/Hash.h> +#include <Common/HyperLogLogWithSmallSetOptimization.h> +#include <Parsers/NullsAction.h> +namespace DB +{ + +namespace ErrorCodes +{ +extern const int PARAMETER_OUT_OF_BOUND; +extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +extern const int ILLEGAL_TYPE_OF_ARGUMENT; +extern const int INCORRECT_DATA; +} + +struct HyperLogLogPlusPlusData +{ + explicit HyperLogLogPlusPlusData(double relative_sd_ = 0.05) + : relative_sd(relative_sd_) + , p(static_cast<UInt64>(std::ceil(2.0 * std::log(1.106 / relative_sd) / std::log(2.0)))) + , idx_shift(64 - p) + , w_padding(1ULL << (p - 1)) + , m(1ULL << p) + , num_words(m / REGISTERS_PER_WORD + 1) + , alpha_mm(computeAlphaMM()) + { + if (p < 4) + throw Exception( + ErrorCodes::PARAMETER_OUT_OF_BOUND, + "HLL++ requires at least 4 bits for addressing instead of {}. Use a lower error, at most 39%", + p); + + if (p > 25) + throw Exception( + ErrorCodes::PARAMETER_OUT_OF_BOUND, + "HLL++ requires at most 25 bits for addressing instead of {} to avoid allocating too much memory", + p); + + // std::cout << "relative_sd:" << relative_sd << " p:" << p << " m:" << m << " num_words:" << num_words << " alpha_mm:" << alpha_mm + // << std::endl; + registers = PaddedPODArray<UInt64>(num_words, 0); // Initialize registers with zeros + } + + void serialize(WriteBuffer & buf) const + { + writeBinaryLittleEndian(relative_sd, buf); + + writeBinaryLittleEndian(registers.size(), buf); + for (const auto & word : registers) + writeBinaryLittleEndian(word, buf); + } + + void deserialize(ReadBuffer & buf) + { + Float64 relative_rd_read; + readBinaryLittleEndian(relative_rd_read, buf); + if (relative_rd_read != relative_sd) + throw Exception( + ErrorCodes::INCORRECT_DATA, "The relative standard deviation {} isn't the expected one {}", relative_rd_read, relative_sd); + + size_t registers_size = 0; + readBinaryLittleEndian(registers_size, buf); + if (registers_size != registers.size()) + throw Exception( + ErrorCodes::INCORRECT_DATA, "The number of registers {} isn't the expected one {}", registers_size, registers.size()); + + for (size_t i = 0; i < registers_size; ++i) + readBinaryLittleEndian(registers[i], buf); + } + + void add(UInt64 value) + { + // std::cout << "add:" << value << std::endl; + UInt64 x = value; + UInt64 idx = x >> idx_shift; + UInt64 w = (x << p) | w_padding; + UInt64 pw = __builtin_clzll(w) + 1; + + UInt64 word_offset = idx / REGISTERS_PER_WORD; + UInt64 word = registers[word_offset]; + + UInt64 shift = (idx - word_offset * REGISTERS_PER_WORD) * REGISTER_SIZE; + UInt64 mask = REGISTER_WORD_MASK << shift; + UInt64 midx = (word & mask) >> shift; + // std::cout << "bucket[" << std::setw(3) << std::setfill('0') << idx << "]=" << midx << " input:" << value << " pw:" << pw; + if (pw > midx) + { + // std::cout << " updated"; + registers[word_offset] = (word & ~mask) | (pw << shift); + } + // std::cout << std::endl; + } + + void merge(const HyperLogLogPlusPlusData & other) + { + size_t i = 0; // global register index + for (size_t word_i = 0; word_i < num_words; ++word_i) + { + UInt64 word1 = registers[word_i]; + UInt64 word2 = other.registers[word_i]; + UInt64 mask = REGISTER_WORD_MASK; + UInt64 merged_word = 0; + for (size_t register_i = 0; i < m && register_i < REGISTERS_PER_WORD; ++register_i, ++i) + { + merged_word |= std::max((word1 & mask), (word2 & mask)); + mask <<= REGISTER_SIZE; + } + registers[word_i] = merged_word; + } + } + + UInt64 query() const + { + double z_inverse = 0.0; + UInt64 v = 0; + + size_t i = 0; // global register index + for (size_t word_i = 0; word_i < num_words; ++word_i) + { + UInt64 word = registers[word_i]; + for (size_t register_i = 0; i < m && register_i < REGISTERS_PER_WORD; ++register_i, ++i) + { + UInt64 midx = (word >> (register_i * REGISTER_SIZE)) & REGISTER_WORD_MASK; + // std::cout << "bucket[" << std::setw(3) << std::setfill('0') << i << "]=" << midx << std::endl; + z_inverse += 1.0 / (1ULL << midx); + + if (midx == 0) + ++v; + } + } + // std::cout << "num_zero:" << v << std::endl; + + double e = alpha_mm / z_inverse; + // std::cout << "e:" << e << std::endl; + + double e_bias_corrected = e; + if (p < 19 && e < 5.0 * m) + e_bias_corrected = e - estimateBias(e); + + // std::cout << "e_bias_corrected:" << e_bias_corrected << std::endl; + + if (v > 0) + { + double h = m * std::log(static_cast<double>(m) / v); + if ((p < 19 && h <= THRESHOLDS[p - 4]) || e <= 2.5 * m) + { + // std::cout << "h:" << h << std::endl; + return static_cast<UInt64>(std::round(h)); + } + else + return static_cast<UInt64>(std::round(e_bias_corrected)); + } + else + return static_cast<UInt64>(std::round(e_bias_corrected)); + } + +private: + Float64 computeAlphaMM() const + { + // Compute alpha * m * m based on the value of m + if (p == 4) + return 0.673 * m * m; + else if (p == 5) + return 0.697 * m * m; + else if (p == 6) + return 0.709 * m * m; + else + return (0.7213 / (1 + 1.079 / m)) * m * m; + } + + + /// Estimate the bias using the raw estimates with their respective biases from the HLL++ + /// appendix. We currently use KNN interpolation to determine the bias (as suggested in the + /// paper). + double estimateBias(double e) const + { + const auto & estimates = RAW_ESTIMATE_DATA[p - 4]; + size_t num_estimates = estimates.size(); + + // Binary search to find the nearest estimate index + auto nearest_estimate_it = std::lower_bound(estimates.begin(), estimates.end(), e); + size_t nearest_estimate_index = std::distance(estimates.begin(), nearest_estimate_it); + + // Distance metric: square of the difference + auto distance = [&](size_t i) + { + double diff = e - estimates[i]; + return diff * diff; + }; + + // Adjust bounds + size_t low = nearest_estimate_index + 1 > K ? nearest_estimate_index - K + 1 : 0; + size_t high = std::min(low + K, num_estimates); + while (high < num_estimates && distance(high) < distance(low)) + { + ++low; + ++high; + } + + // Sum biases in the interval + const auto & biases = BIAS_DATA[p - 4]; + double bias_sum = 0.0; + for (size_t i = low; i < high; ++i) + bias_sum += biases[i]; + + // Calculate the bias + return bias_sum / (high - low); + } + + /// Defines the maximum relative standard deviation allowed. + const Float64 relative_sd; + + /// HLL++ uses 'p' bits for bucket addressing. + const UInt64 p; + + /// Shift used to extract the index of the register from the hashed value. + /// This assumes the use of 64-bit hashcodes. + const UInt64 idx_shift; + + /// Value to pad the 'w' value with before the number of leading zeros is determined. + const UInt64 w_padding; + + /// The number of registers used. + const UInt64 m; + + /// The number of words used to store the registers. We use Longs for storage because this is the + /// most compact way of storage; Spark aligns to 8-byte words or uses Long wrappers. + + /// We only store whole registers per word in order to prevent overly complex bitwise operations. + /// In practice this means we only use 60 out of 64 bits. + const UInt64 num_words; + + /// The pre-calculated combination of: alpha * m * m + /// 'alpha' corrects the raw cardinality estimate 'Z'. See the FlFuGaMe07 paper for its derivation. + const double alpha_mm; + + /// The number of bits that is required per register. + /// + /// This number is determined by the maximum number of leading binary zeros a hashcode can + /// produce. This is equal to the number of bits the hashcode returns. The current + /// implementation uses a 64-bit hashcode, this means 6-bits are (at most) needed to store the + /// number of leading zeros. + static constexpr UInt64 REGISTER_SIZE = 6; + static constexpr UInt64 REGISTER_WORD_MASK = (1ULL << REGISTER_SIZE) - 1; + + /// The size of a word used for storing registers: 64 Bits. + static constexpr UInt64 WORD_SIZE = 64; + + static constexpr UInt64 REGISTERS_PER_WORD = WORD_SIZE / REGISTER_SIZE; + + /// Number of points used for interpolating the bias value. + static constexpr UInt64 K = 6; + + inline static const std::vector<std::vector<double>> BIAS_DATA = { + // precision 4 + {10, Review Comment: 这里格式化下吧每行10个元素。 ########## cpp-ch/local-engine/AggregateFunctions/AggregateFunctionUniqHyperLogLogPlusPlus.h: ########## @@ -0,0 +1,3089 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/AggregateFunctionFactory.h> +#include <Columns/ColumnVector.h> +#include <DataTypes/DataTypesNumber.h> +#include <DataTypes/IDataType.h> +#include <DataTypes/DataTypeAggregateFunction.h> +#include <Common/FieldVisitorConvertToNumber.h> +#include <Common/FieldVisitors.h> +#include <Common/HashTable/Hash.h> +#include <Common/HyperLogLogWithSmallSetOptimization.h> +#include <Parsers/NullsAction.h> +namespace DB +{ + +namespace ErrorCodes +{ +extern const int PARAMETER_OUT_OF_BOUND; +extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +extern const int ILLEGAL_TYPE_OF_ARGUMENT; +extern const int INCORRECT_DATA; +} + +struct HyperLogLogPlusPlusData +{ + explicit HyperLogLogPlusPlusData(double relative_sd_ = 0.05) Review Comment: Use Float64 for consistency ########## cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp: ########## @@ -25,6 +25,7 @@ #include <Parser/TypeParser.h> #include <Common/CHUtil.h> #include <Common/Exception.h> +#include <Common/logger_useful.h> Review Comment: Useless header -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
