This is an automated email from the ASF dual-hosted git repository. lwz9103 pushed a commit to branch liquid in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit ee10b53cc62354244aa19768dc231cb3a8c4ebd2 Author: lwz9103 <[email protected]> AuthorDate: Thu Feb 1 10:42:43 2024 +0800 support bitmap compatible with ke - 20250402 Fix conflict due to https://github.com/apache/incubator-gluten/pull/9138 (cherry picked from commit 681fb2ac4657f2a3f6b13fd2c234583ab354f611) --- .../AggregateFunctions/KeAggregateBitmapData.h | 371 +++++++++++++++++ .../KeAggregateBitmapFunctions.cpp | 122 ++++++ .../KeAggregateBitmapFunctions.h | 438 +++++++++++++++++++++ cpp-ch/local-engine/Common/CHUtil.cpp | 2 + cpp-ch/local-engine/Functions/CMakeLists.txt | 1 + .../Functions/SparkFunctionBitmapCardinality.cpp | 85 ++++ .../Functions/SparkFunctionBitmapCardinality.h | 50 +++ .../CommonAggregateFunctionParser.cpp | 4 + .../Storages/ReadBufferFromJavaBitmap.cpp | 53 +++ .../Storages/ReadBufferFromJavaBitmap.h | 39 ++ 10 files changed, 1165 insertions(+) diff --git a/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapData.h b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapData.h new file mode 100644 index 0000000000..e70c6817c4 --- /dev/null +++ b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapData.h @@ -0,0 +1,371 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include <stdio.h> +#include <stdlib.h> +#include <time.h> +#include <IO/ReadHelpers.h> +#include <IO/WriteHelpers.h> +#include <Common/PODArray.h> +#include "Storages/ReadBufferFromJavaBitmap.h" + +// Include this header last, because it is an auto-generated dump of questionable +// garbage that breaks the build (e.g. it changes _POSIX_C_SOURCE). +// TODO: find out what it is. On github, they have proper interface headers like +// this one: https://github.com/RoaringBitmap/CRoaring/blob/master/include/roaring/roaring.h +#include <roaring.hh> +#include <roaring64map.hh> + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TOO_LARGE_ARRAY_SIZE; + extern const int INCORRECT_DATA; +} +} + +namespace local_engine +{ +using namespace DB; + +/** + * Roaring bitmap data. + * For a description of the roaring_bitmap_t, see: https://github.com/RoaringBitmap/CRoaring + */ +template <typename T> +class KeRoaringBitmapData : private boost::noncopyable +{ +public: + using RoaringBitmap = std::conditional_t<sizeof(T) >= 8, roaring::Roaring64Map, roaring::Roaring>; + using Value = UInt64; + RoaringBitmap roaring_bitmap; + + void add(T value) { roaring_bitmap.add(static_cast<Value>(value)); } + + UInt64 size() const { return roaring_bitmap.cardinality(); } + + void merge(const KeRoaringBitmapData & r1) { roaring_bitmap |= r1.roaring_bitmap; } + + void read(DB::ReadBuffer & in) + { + static thread_local String buf; + size_t size; + readVarUInt(size, in); + + static constexpr size_t max_size = 100_GiB; + + if (size == 0) + throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect size (0) in groupBitmap."); + if (size > max_size) + throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size in groupBitmap (maximum: {})", max_size); + + if (in.available() > size) + { + roaring_bitmap = RoaringBitmap::readSafe(in.position(), size); + in.ignore(size); + } + else + { + buf.reserve(size); + in.readStrict(buf.data(), size); + roaring_bitmap = RoaringBitmap::readSafe(buf.data(), size); + } + } + + void write(DB::WriteBuffer & out) const + { + auto size = roaring_bitmap.getSizeInBytes(); + writeVarUInt(size, out); + if (out.available() > size) + { + roaring_bitmap.write(out.position()); + out.position() += size; + } + else + { + std::unique_ptr<char[]> buf(new char[size]); + roaring_bitmap.write(buf.get()); + out.write(buf.get(), size); + } + } + + void to_ke_bitmap_data(DB::WriteBuffer & ke_bitmap_data_buffer) const + { + auto size = roaring_bitmap.getSizeInBytes(); + + std::unique_ptr<char[]> buf(new char[size]); + roaring_bitmap.write(buf.get()); + + Int8 signedLongs = 0; + writeBinary(signedLongs, ke_bitmap_data_buffer); + + writeBinary(static_cast<unsigned char>(buf.get()[3]), ke_bitmap_data_buffer); + writeBinary(static_cast<unsigned char>(buf.get()[2]), ke_bitmap_data_buffer); + writeBinary(static_cast<unsigned char>(buf.get()[1]), ke_bitmap_data_buffer); + writeBinary(static_cast<unsigned char>(buf.get()[0]), ke_bitmap_data_buffer); + + auto bitmap_data = buf.get() + 8; + ke_bitmap_data_buffer.write(bitmap_data, size - 8); + } + + /** + * Computes the intersection between two bitmaps + */ + void rb_and(const KeRoaringBitmapData & r1) /// NOLINT + { + roaring_bitmap &= r1.roaring_bitmap; + } + + /** + * Computes the union between two bitmaps. + */ + void rb_or(const KeRoaringBitmapData & r1) + { + merge(r1); /// NOLINT + } + + /** + * Computes the symmetric difference (xor) between two bitmaps. + */ + void rb_xor(const KeRoaringBitmapData & r1) /// NOLINT + { + roaring_bitmap ^= r1.roaring_bitmap; + } + + /** + * Computes the difference (andnot) between two bitmaps + */ + void rb_andnot(const KeRoaringBitmapData & r1) /// NOLINT + { + roaring_bitmap -= r1.roaring_bitmap; + } + + /** + * Computes the cardinality of the intersection between two bitmaps. + */ + UInt64 rb_and_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT + { + return (roaring_bitmap & r1.roaring_bitmap).cardinality(); + } + + /** + * Computes the cardinality of the union between two bitmaps. + */ + UInt64 rb_or_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT + { + UInt64 c1 = size(); + UInt64 c2 = r1.size(); + UInt64 inter = rb_and_cardinality(r1); + return c1 + c2 - inter; + } + + /** + * Computes the cardinality of the symmetric difference (andnot) between two bitmaps. + */ + UInt64 rb_xor_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT + { + UInt64 c1 = size(); + UInt64 c2 = r1.size(); + UInt64 inter = rb_and_cardinality(r1); + return c1 + c2 - 2 * inter; + } + + /** + * Computes the cardinality of the difference (andnot) between two bitmaps. + */ + UInt64 rb_andnot_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT + { + UInt64 c1 = size(); + UInt64 inter = rb_and_cardinality(r1); + return c1 - inter; + } + + /** + * Return 1 if the two bitmaps contain the same elements. + */ + UInt8 rb_equals(const KeRoaringBitmapData & r1) /// NOLINT + { + return roaring_bitmap == r1.roaring_bitmap; + } + + /** + * Check whether two bitmaps intersect. + * Intersection with an empty set is always 0 (consistent with hasAny). + */ + UInt8 rb_intersect(const KeRoaringBitmapData & r1) const /// NOLINT + { + if ((roaring_bitmap & r1.roaring_bitmap).cardinality() > 0) + return 1; + return 0; + } + + /** + * Check whether the argument is the subset of this set. + * Empty set is a subset of any other set (consistent with hasAll). + * It's used in subset and currently only support comparing same type + */ + UInt8 rb_is_subset(const KeRoaringBitmapData & r1) const /// NOLINT + { + if (!r1.roaring_bitmap.isSubset(roaring_bitmap)) + return 0; + return 1; + } + + /** + * Check whether this bitmap contains the argument. + */ + UInt8 rb_contains(UInt64 x) const /// NOLINT + { + if (!std::is_same_v<T, UInt64> && x > rb_max()) + return 0; + + UInt32 high_bytes = uint32_t(x >> 32); + UInt32 high_bytes_new + = ((high_bytes >> 24)) | ((high_bytes >> 8) & 0xFF00) | ((high_bytes << 8) & 0xFF0000) | ((high_bytes << 24)); + UInt64 value = (uint64_t(high_bytes_new) << 32) | uint64_t(uint32_t(x)); + + return roaring_bitmap.contains(value); + } + + /** + * Convert elements to integer array, return number of elements + */ + template <typename Element> + UInt64 rb_to_array(PaddedPODArray<Element> & res) const /// NOLINT + { + UInt64 count = 0; + for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); ++it) + { + // reverse high 4 bytes to Little-endian + Int64 original_value = *it; + UInt32 high_bytes = uint32_t(original_value >> 32); + UInt32 high_bytes_new + = ((high_bytes >> 24)) | ((high_bytes >> 8) & 0xFF00) | ((high_bytes << 8) & 0xFF0000) | ((high_bytes << 24)); + Int64 value = (uint64_t(high_bytes_new) << 32) | uint64_t(uint32_t(original_value)); + res.emplace_back(value); + ++count; + } + return count; + } + + /** + * Return new set with specified range (not include the range_end) + * It's used in subset and currently only support UInt32 + */ + UInt64 rb_range(UInt64 range_start, UInt64 range_end, KeRoaringBitmapData & r1) const /// NOLINT + { + UInt64 count = 0; + if (range_start >= range_end) + return count; + + for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); ++it) + { + if (*it < range_start) + continue; + + if (*it < range_end) + { + r1.add(*it); + ++count; + } + else + break; + } + return count; + } + + /** + * Return new set of the smallest `limit` values in set which is no less than `range_start`. + * It's used in subset and currently only support UInt32 + */ + UInt64 rb_limit(UInt64 range_start, UInt64 limit, KeRoaringBitmapData & r1) const /// NOLINT + { + if (limit == 0) + return 0; + + UInt64 count = 0; + for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); ++it) + { + if (*it < range_start) + continue; + + if (count < limit) + { + r1.add(*it); + ++count; + } + else + break; + } + return count; + } + + UInt64 rb_offset_limit(UInt64 offset, UInt64 limit, KeRoaringBitmapData & r1) const /// NOLINT + { + if (limit == 0 || offset >= size()) + return 0; + + UInt64 count = 0; + UInt64 offset_count = 0; + auto it = roaring_bitmap.begin(); + for (; it != roaring_bitmap.end() && offset_count < offset; ++it) + ++offset_count; + + for (; it != roaring_bitmap.end() && count < limit; ++it, ++count) + r1.add(*it); + return count; + } + + UInt64 rb_min() const /// NOLINT + { + return roaring_bitmap.minimum(); + } + + UInt64 rb_max() const /// NOLINT + { + return roaring_bitmap.maximum(); + } + + /** + * Replace value. + * It's used in transform and currently can only support UInt32 + */ + void rb_replace(const UInt64 * from_vals, const UInt64 * to_vals, size_t num) /// NOLINT + { + for (size_t i = 0; i < num; ++i) + { + if (from_vals[i] == to_vals[i]) + continue; + bool changed = roaring_bitmap.removeChecked(static_cast<Value>(from_vals[i])); + if (changed) + roaring_bitmap.add(static_cast<Value>(to_vals[i])); + } + } +}; + +template <typename T> +struct KeAggregateBitmapData +{ + // If false, all bitmap operations will be treated as merge to initialize the state + bool init = false; + KeRoaringBitmapData<T> roaring_bitmap; + static const char * name() { return "keBitmap"; } +}; + +} diff --git a/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.cpp b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.cpp new file mode 100644 index 0000000000..2747895af2 --- /dev/null +++ b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.cpp @@ -0,0 +1,122 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include <AggregateFunctions/AggregateFunctionFactory.h> +#include <AggregateFunctions/FactoryHelpers.h> +#include <DataTypes/DataTypeAggregateFunction.h> + +#include <AggregateFunctions/KeAggregateBitmapFunctions.h> + + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} +} + +namespace local_engine +{ +using namespace DB; + +AggregateFunctionPtr +createKeAggregateBitmapOrCardinalityFunction(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + DataTypePtr argument_type_ptr = argument_types[0]; + AggregateFunctionPtr res( + new local_engine::KeAggregateBitmapOrCardinality<Int64, local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr)); + + if (!res) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of argument for aggregate function {}", + argument_types[0]->getName(), + name); + + return res; +} + +AggregateFunctionPtr +createKeAggregateBitmapOrDataFunction(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + DataTypePtr argument_type_ptr = argument_types[0]; + AggregateFunctionPtr res( + new local_engine::KeAggregateBitmapOr<std::string, local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr)); + + if (!res) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of argument for aggregate function {}", + argument_types[0]->getName(), + name); + + return res; +} + +AggregateFunctionPtr createKeAggregateBitmapAndValueFunction( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + DataTypePtr argument_type_ptr = argument_types[0]; + AggregateFunctionPtr res( + new local_engine::KeAggregateBitmapAndValue<Int64, local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr)); + + if (!res) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of argument for aggregate function {}", + argument_types[0]->getName(), + name); + + return res; +} + +AggregateFunctionPtr createKeAggregateBitmapAndIdsFunction( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) +{ + assertNoParameters(name, parameters); + assertUnary(name, argument_types); + DataTypePtr argument_type_ptr = argument_types[0]; + AggregateFunctionPtr res( + new local_engine::KeAggregateBitmapAndIds<Int64, local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr)); + + if (!res) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal type {} of argument for aggregate function {}", + argument_types[0]->getName(), + name); + + return res; +} + +void registerKeAggregateFunctionsBitmap(AggregateFunctionFactory & factory) +{ + AggregateFunctionProperties properties = { .returns_default_when_only_null = true }; + factory.registerFunction("ke_bitmap_or_cardinality", {createKeAggregateBitmapOrCardinalityFunction, properties}, AggregateFunctionFactory::CaseInsensitive); + factory.registerFunction("ke_bitmap_or_data", {createKeAggregateBitmapOrDataFunction, properties}, AggregateFunctionFactory::CaseInsensitive); + factory.registerFunction("ke_bitmap_and_value", { createKeAggregateBitmapAndValueFunction, properties}, AggregateFunctionFactory::CaseInsensitive); + factory.registerFunction("ke_bitmap_and_ids", {createKeAggregateBitmapAndIdsFunction, properties}, AggregateFunctionFactory::CaseInsensitive); +} +} diff --git a/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.h b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.h new file mode 100644 index 0000000000..f475cf174e --- /dev/null +++ b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.h @@ -0,0 +1,438 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#pragma once + +#include <AggregateFunctions/AggregateFunctionFactory.h> +#include <AggregateFunctions/IAggregateFunction.h> +#include <AggregateFunctions/Combinators/AggregateFunctionNull.h> +#include <Columns/ColumnAggregateFunction.h> +#include <Columns/ColumnString.h> +#include <Columns/ColumnVector.h> +#include <DataTypes/DataTypeArray.h> +#include <DataTypes/DataTypeString.h> +#include <DataTypes/DataTypesNumber.h> +#include <Storages/ReadBufferFromJavaBitmap.h> +#include <AggregateFunctions/KeAggregateBitmapData.h> +#include <Common/assert_cast.h> + +namespace local_engine +{ + +using namespace DB; + +// For handle null values +template <bool result_is_nullable, bool serialize_flag> +class SparkAggregateBitmapNullUnary final + : public AggregateFunctionNullBase<result_is_nullable, serialize_flag, + SparkAggregateBitmapNullUnary<result_is_nullable, serialize_flag>> +{ +public: + SparkAggregateBitmapNullUnary(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params) + : AggregateFunctionNullBase<result_is_nullable, serialize_flag, + SparkAggregateBitmapNullUnary<result_is_nullable, serialize_flag>>(std::move(nested_function_), arguments, params) + { + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]); + if (column->isNullAt(row_num)) { + return; + } + const IColumn * nested_column = &column->getNestedColumn(); + this->nested_function->add(place, &nested_column, row_num, arena); + } +}; + +template <typename T, typename Data> +class KeAggregateBitmapOrCardinality final : public IAggregateFunctionDataHelper<Data, KeAggregateBitmapOrCardinality<T, Data>> +{ +public: + explicit KeAggregateBitmapOrCardinality(const DB::DataTypePtr & type) + : IAggregateFunctionDataHelper<Data, KeAggregateBitmapOrCardinality<T, Data>>({type}, {}, createResultType()) + { + } + + ~KeAggregateBitmapOrCardinality() override + { + } + + String getName() const override { return "ke_bitmap_or_cardinality"; } + + static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); } + + bool allocatesMemoryInArena() const override { return false; } + + AggregateFunctionPtr getOwnNullAdapter( + const AggregateFunctionPtr & nested_function, const DataTypes & types, const Array & params, const AggregateFunctionProperties & /* properties */) const override + { + return std::make_shared<SparkAggregateBitmapNullUnary<false, false>>(nested_function, types, params); + } + + void create(AggregateDataPtr __restrict place) const override + { + new (place) Data; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + Data & data_lhs = this->data(place); + auto bitmap_data = assert_cast<const ColumnString &>(*columns[0]).getDataAt(row_num); + + if (!data_lhs.init) + { + data_lhs.init = true; + // Null data will be skip in AggregateFunctionNullUnary.add() + if (!bitmap_data.empty() && bitmap_data.data) + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_lhs.roaring_bitmap.read(*charBuff); + } + } + else + { + if (!bitmap_data.empty()) + { + auto data_rhs = std::make_unique<Data>(); + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_rhs->roaring_bitmap.read(*charBuff); + data_lhs.roaring_bitmap.merge(data_rhs->roaring_bitmap); + } + } + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + Data & data_lhs = this->data(place); + const Data & data_rhs = this->data(rhs); + + if (!data_rhs.init) + return; + + if (!data_lhs.init) + { + data_lhs.init = true; + } + data_lhs.roaring_bitmap.merge(data_rhs.roaring_bitmap); + } + + bool isVersioned() const override { return false; } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override + { + this->data(place).roaring_bitmap.write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override + { + this->data(place).init = true; + this->data(place).roaring_bitmap.read(buf); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + assert_cast<ColumnVector<T> &>(to).getData().push_back(static_cast<T>(this->data(place).roaring_bitmap.size())); + } +}; + +template <typename T, typename Data> +class KeAggregateBitmapOr final : public IAggregateFunctionDataHelper<Data, KeAggregateBitmapOr<T, Data>> +{ +public: + explicit KeAggregateBitmapOr(const DB::DataTypePtr & type) + : IAggregateFunctionDataHelper<Data, KeAggregateBitmapOr<T, Data>>({type}, {}, createResultType()) + { + } + + String getName() const override { return "ke_bitmap_or_data"; } + + static DataTypePtr createResultType() { return std::make_shared<DataTypeString>(); } + + bool allocatesMemoryInArena() const override { return false; } + + AggregateFunctionPtr getOwnNullAdapter( + const AggregateFunctionPtr & nested_function, const DataTypes & types, const Array & params, const AggregateFunctionProperties & /*properties*/) const override + { + return std::make_shared<SparkAggregateBitmapNullUnary<false, false>>(nested_function, types, params); + } + + void create(AggregateDataPtr __restrict place) const override + { + new (place) Data; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + Data & data_lhs = this->data(place); + auto bitmap_data = assert_cast<const ColumnString &>(*columns[0]).getDataAt(row_num); + + if (!data_lhs.init) + { + data_lhs.init = true; + // Null data will be skip in AggregateFunctionNullUnary.add() + if (!bitmap_data.empty()) + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_lhs.roaring_bitmap.read(*charBuff); + } + } + else + { + if (!bitmap_data.empty()) + { + auto data_rhs = std::make_unique<Data>(); + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_rhs->roaring_bitmap.read(*charBuff); + data_lhs.roaring_bitmap.rb_or(data_rhs->roaring_bitmap); + } + } + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + Data & data_lhs = this->data(place); + const Data & data_rhs = this->data(rhs); + + if (!data_rhs.init) + return; + + if (!data_lhs.init) + { + data_lhs.init = true; + } + data_lhs.roaring_bitmap.rb_or(data_rhs.roaring_bitmap); + } + + bool isVersioned() const override { return false; } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override + { + this->data(place).roaring_bitmap.write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override + { + this->data(place).init = true; + this->data(place).roaring_bitmap.read(buf); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + std::string str; + auto write_buffer = std::make_unique<WriteBufferFromVector<std::string>>(str); + this->data(place).roaring_bitmap.to_ke_bitmap_data(*write_buffer); + assert_cast<ColumnString &>(to).insert(str); + } +}; + +template <typename T, typename Data> +class KeAggregateBitmapAndValue final : public IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndValue<T, Data>> +{ +public: + explicit KeAggregateBitmapAndValue(const DB::DataTypePtr & type) + : IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndValue<T, Data>>({type}, {}, createResultType()) + { + } + + String getName() const override { return "ke_bitmap_and_value"; } + + static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); } + + bool allocatesMemoryInArena() const override { return false; } + + AggregateFunctionPtr getOwnNullAdapter( + const AggregateFunctionPtr & nested_function, const DataTypes & types, const Array & params, const AggregateFunctionProperties & /*properties*/) const override + { + return std::make_shared<SparkAggregateBitmapNullUnary<false, false>>(nested_function, types, params); + } + + void create(AggregateDataPtr __restrict place) const override + { + new (place) Data; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + Data & data_lhs = this->data(place); + auto bitmap_data = assert_cast<const ColumnString &>(*columns[0]).getDataAt(row_num); + + if (!data_lhs.init) + { + data_lhs.init = true; + // Null data will be skip in AggregateFunctionNullUnary.add() + if (!bitmap_data.empty()) + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_lhs.roaring_bitmap.read(*charBuff); + } + } + else + { + auto data_rhs = std::make_unique<Data>(); + if (!bitmap_data.empty()) + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_rhs->roaring_bitmap.read(*charBuff); + } + data_lhs.roaring_bitmap.rb_and(data_rhs->roaring_bitmap); + } + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + Data & data_lhs = this->data(place); + const Data & data_rhs = this->data(rhs); + + if (!data_rhs.init) + return; + + if (!data_lhs.init) + { + data_lhs.init = true; + data_lhs.roaring_bitmap.merge(data_rhs.roaring_bitmap); + } + else + { + data_lhs.roaring_bitmap.rb_and(data_rhs.roaring_bitmap); + } + } + + bool isVersioned() const override { return false; } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override + { + this->data(place).roaring_bitmap.write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override + { + this->data(place).init = true; + this->data(place).roaring_bitmap.read(buf); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + assert_cast<ColumnVector<T> &>(to).getData().push_back(static_cast<T>(this->data(place).roaring_bitmap.size())); + } +}; + +template <typename T, typename Data> +class KeAggregateBitmapAndIds final : public IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndIds<T, Data>> +{ +public: + explicit KeAggregateBitmapAndIds(const DB::DataTypePtr & type) + : IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndIds<T, Data>>({type}, {}, createResultType()) + { + } + + String getName() const override { return "ke_bitmap_and_ids"; } + + static DataTypePtr createResultType() { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeNumber<T>>()); } + + bool allocatesMemoryInArena() const override { return false; } + + AggregateFunctionPtr getOwnNullAdapter( + const AggregateFunctionPtr & nested_function, const DataTypes & types, const Array & params, const AggregateFunctionProperties & /*properties*/) const override + { + return std::make_shared<SparkAggregateBitmapNullUnary<false, false>>(nested_function, types, params); + } + + void create(AggregateDataPtr __restrict place) const override + { + new (place) Data; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override + { + Data & data_lhs = this->data(place); + auto bitmap_data = assert_cast<const ColumnString &>(*columns[0]).getDataAt(row_num); + + if (!data_lhs.init) + { + data_lhs.init = true; + // Null data will be skip in AggregateFunctionNullUnary.add() + if (!bitmap_data.empty()) + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_lhs.roaring_bitmap.read(*charBuff); + } + } + else + { + auto data_rhs = std::make_unique<Data>(); + if (!bitmap_data.empty()) + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(bitmap_data.data), bitmap_data.size); + data_rhs->roaring_bitmap.read(*charBuff); + } + data_lhs.roaring_bitmap.rb_and(data_rhs->roaring_bitmap); + } + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override + { + Data & data_lhs = this->data(place); + const Data & data_rhs = this->data(rhs); + + if (!data_rhs.init) + return; + + if (!data_lhs.init) + { + data_lhs.init = true; + data_lhs.roaring_bitmap.merge(data_rhs.roaring_bitmap); + } + else + { + data_lhs.roaring_bitmap.rb_and(data_rhs.roaring_bitmap); + } + } + + bool isVersioned() const override { return false; } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override + { + this->data(place).roaring_bitmap.write(buf); + } + + void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override + { + this->data(place).init = true; + this->data(place).roaring_bitmap.read(buf); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + UInt64 cardinality = this->data(place).roaring_bitmap.size(); + + if (cardinality > 10000000) + { + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "There are too many returned data ({}) for the bitmap.", cardinality); + } + else + { + auto & arr_to = assert_cast<ColumnArray &>(to); + ColumnArray::Offsets & offsets_to = arr_to.getOffsets(); + PaddedPODArray<T> & res_data = typeid_cast<ColumnVector<T> &>(arr_to.getData()).getData(); + UInt64 count = this->data(place).roaring_bitmap.rb_to_array(res_data); + offsets_to.push_back(offsets_to.back() + count); + } + } +}; + +void registerKeAggregateFunctionsBitmap(AggregateFunctionFactory & factory); +} diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 84eb55f379..8187e858ec 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -22,6 +22,7 @@ #include <optional> #include <unistd.h> #include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h> +#include <AggregateFunctions/KeAggregateBitmapFunctions.h> #include <AggregateFunctions/registerAggregateFunctions.h> #include <Columns/ColumnArray.h> #include <Columns/ColumnConst.h> @@ -881,6 +882,7 @@ void registerAllFunctions() registerAggregateFunctionsBloomFilter(agg_factory); registerAggregateFunctionSparkAvg(agg_factory); registerAggregateFunctionRowNumGroup(agg_factory); + registerKeAggregateFunctionsBitmap(agg_factory); DB::registerAggregateFunctionUniqHyperLogLogPlusPlus(agg_factory); registerAggregateFunctionDVRoaringBitmap(agg_factory); diff --git a/cpp-ch/local-engine/Functions/CMakeLists.txt b/cpp-ch/local-engine/Functions/CMakeLists.txt index 9e31595e03..f0371c4db5 100644 --- a/cpp-ch/local-engine/Functions/CMakeLists.txt +++ b/cpp-ch/local-engine/Functions/CMakeLists.txt @@ -41,6 +41,7 @@ list( ch_contrib::cityhash ch_contrib::farmhash ch_contrib::xxHash + ch_contrib::roaring OpenSSL::SSL) if(TARGET ch_contrib::vectorscan) diff --git a/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.cpp b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.cpp new file mode 100644 index 0000000000..fe9a3aa970 --- /dev/null +++ b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.cpp @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SparkFunctionBitmapCardinality.h" +#include <string> +#include <DataTypes/DataTypeNullable.h> +#include <DataTypes/DataTypesNumber.h> +#include <Functions/FunctionFactory.h> +#include <Functions/FunctionHelpers.h> +#include <Common/Exception.h> +#include "AggregateFunctions/KeAggregateBitmapData.h" + +namespace DB +{ +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NOT_IMPLEMENTED; + extern const int ILLEGAL_COLUMN; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} +} + +namespace local_engine +{ +using namespace DB; +DB::DataTypePtr SparkFunctionBitmapCardinality::getReturnTypeImpl(const DB::ColumnsWithTypeAndName & arguments) const +{ + if (arguments.size() != 1) + { + throw DB::Exception( + DB::ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires 1 arguments, passed {}", getName(), arguments.size()); + } + + if (!DB::WhichDataType(arguments[0].type).isString()) + { + throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "All arguments for function {} must be String", getName()); + } + return std::make_shared<DataTypeNumber<UInt64>>(); +} + +DB::ColumnPtr SparkFunctionBitmapCardinality::executeImpl( + const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr & result_type, const size_t input_rows_count) const +{ + auto x = arguments[0].column; + + auto res = result_type->createColumn(); + res->reserve(input_rows_count); + + for (size_t i = 0; i < input_rows_count; ++i) + { + auto data_str = x->getDataAt(i); + if (data_str.empty()) + { + res->insertDefault(); + } + else + { + auto charBuff = std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(data_str.data), data_str.size); + auto bitmap = std::make_unique<KeRoaringBitmapData<Int64>>(); + bitmap->read(*charBuff); + res->insert(bitmap->size()); + } + } + return res; +} + +REGISTER_FUNCTION(SparkFunctionBitmapCardinality) +{ + factory.registerFunction<SparkFunctionBitmapCardinality>(); +} +} diff --git a/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.h b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.h new file mode 100644 index 0000000000..ba02c82b39 --- /dev/null +++ b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include <Columns/IColumn.h> +#include <Core/ColumnsWithTypeAndName.h> +#include <DataTypes/DataTypeNullable.h> +#include <DataTypes/IDataType.h> +#include <Functions/IFunction.h> +#include <Interpreters/Context.h> + +namespace local_engine +{ +class SparkFunctionBitmapCardinality : public DB::IFunction +{ +public: + static constexpr auto name = "keBitmapCardinality"; + static DB::FunctionPtr create(DB::ContextPtr) { return std::make_shared<SparkFunctionBitmapCardinality>(); } + SparkFunctionBitmapCardinality() = default; + ~SparkFunctionBitmapCardinality() override = default; + + String getName() const override { return name; } + + size_t getNumberOfArguments() const override { return 1; } + bool isVariadic() const override { return true; } + bool isSuitableForShortCircuitArgumentsExecution(const DB::DataTypesWithConstInfo & /*arguments*/) const override { return true; } + + DB::DataTypePtr getReturnTypeImpl(const DB::ColumnsWithTypeAndName & arguments) const override; + + DB::ColumnPtr executeImpl( + const DB::ColumnsWithTypeAndName & arguments, + const DB::DataTypePtr & result_type, + const size_t /*input_rows_count*/) const override; + + bool useDefaultImplementationForConstants() const override { return true; } +}; +} diff --git a/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp b/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp index ca448edb57..2369d6cdd7 100644 --- a/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp +++ b/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp @@ -20,6 +20,10 @@ namespace local_engine { +REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapOrCardinality, ke_bitmap_or_cardinality, ke_bitmap_or_cardinality) +REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapOrData, ke_bitmap_or_data, ke_bitmap_or_data) +REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapAndValue, ke_bitmap_and_value, ke_bitmap_and_value) +REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapAndIds, ke_bitmap_and_ids, ke_bitmap_and_ids) REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(Sum, sum, sum) REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(Avg, avg, avg) diff --git a/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.cpp b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.cpp new file mode 100644 index 0000000000..1bc532fee1 --- /dev/null +++ b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.cpp @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "ReadBufferFromJavaBitmap.h" +#include <optional> +#include <IO/ReadBuffer.h> +#include <IO/WriteBufferFromString.h> +#include <IO/WriteHelpers.h> + +namespace local_engine +{ + +ReadBufferFromJavaBitmap::ReadBufferFromJavaBitmap(char * original_ptr, size_t original_data_size) : DB::ReadBuffer(original_ptr, 0) +{ + original_size = original_data_size; + ch_bitmap_data_buffer = std::make_unique<WriteBufferFromOwnString>(); + writeVarUInt(original_size + 3, *ch_bitmap_data_buffer); + Int64 map_size = 0; + map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[1]); + map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[2]); + map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[3]); + map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[4]); + writeBinary(map_size, *ch_bitmap_data_buffer); + + BufferBase::set(ch_bitmap_data_buffer->str().data(), ch_bitmap_data_buffer->str().size(), 0); + + java_bitmap_data = original_ptr + 5; +} + +bool ReadBufferFromJavaBitmap::nextImpl() +{ + if (!read_header_finished) + { + read_header_finished = true; + BufferBase::set(java_bitmap_data, original_size - 5, 0); + return true; + } + return false; +} +} diff --git a/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.h b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.h new file mode 100644 index 0000000000..fc06f65b88 --- /dev/null +++ b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.h @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#pragma once + +#include <IO/ReadBuffer.h> +#include <IO/WriteBufferFromString.h> + +namespace local_engine +{ +using namespace DB; + +class ReadBufferFromJavaBitmap : public ReadBuffer +{ +public: + ReadBufferFromJavaBitmap(char * original_ptr, size_t original_data_size); + +private: + size_t original_size = 0; + Position java_bitmap_data; + std::unique_ptr<WriteBufferFromOwnString> ch_bitmap_data_buffer; + bool read_header_finished = false; + + bool nextImpl() override; +}; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
