This is an automated email from the ASF dual-hosted git repository.

lwz9103 pushed a commit to branch liquid
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git

commit ee10b53cc62354244aa19768dc231cb3a8c4ebd2
Author: lwz9103 <[email protected]>
AuthorDate: Thu Feb 1 10:42:43 2024 +0800

    support bitmap compatible with ke
    
    - 20250402 Fix conflict due to 
https://github.com/apache/incubator-gluten/pull/9138
    
    (cherry picked from commit 681fb2ac4657f2a3f6b13fd2c234583ab354f611)
---
 .../AggregateFunctions/KeAggregateBitmapData.h     | 371 +++++++++++++++++
 .../KeAggregateBitmapFunctions.cpp                 | 122 ++++++
 .../KeAggregateBitmapFunctions.h                   | 438 +++++++++++++++++++++
 cpp-ch/local-engine/Common/CHUtil.cpp              |   2 +
 cpp-ch/local-engine/Functions/CMakeLists.txt       |   1 +
 .../Functions/SparkFunctionBitmapCardinality.cpp   |  85 ++++
 .../Functions/SparkFunctionBitmapCardinality.h     |  50 +++
 .../CommonAggregateFunctionParser.cpp              |   4 +
 .../Storages/ReadBufferFromJavaBitmap.cpp          |  53 +++
 .../Storages/ReadBufferFromJavaBitmap.h            |  39 ++
 10 files changed, 1165 insertions(+)

diff --git a/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapData.h 
b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapData.h
new file mode 100644
index 0000000000..e70c6817c4
--- /dev/null
+++ b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapData.h
@@ -0,0 +1,371 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <IO/ReadHelpers.h>
+#include <IO/WriteHelpers.h>
+#include <Common/PODArray.h>
+#include "Storages/ReadBufferFromJavaBitmap.h"
+
+// Include this header last, because it is an auto-generated dump of 
questionable
+// garbage that breaks the build (e.g. it changes _POSIX_C_SOURCE).
+// TODO: find out what it is. On github, they have proper interface headers 
like
+// this one: 
https://github.com/RoaringBitmap/CRoaring/blob/master/include/roaring/roaring.h
+#include <roaring.hh>
+#include <roaring64map.hh>
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+    extern const int TOO_LARGE_ARRAY_SIZE;
+    extern const int INCORRECT_DATA;
+}
+}
+
+namespace local_engine
+{
+using namespace DB;
+
+/**
+  * Roaring bitmap data.
+  * For a description of the roaring_bitmap_t, see: 
https://github.com/RoaringBitmap/CRoaring
+  */
+template <typename T>
+class KeRoaringBitmapData : private boost::noncopyable
+{
+public:
+    using RoaringBitmap = std::conditional_t<sizeof(T) >= 8, 
roaring::Roaring64Map, roaring::Roaring>;
+    using Value = UInt64;
+    RoaringBitmap roaring_bitmap;
+
+    void add(T value) { roaring_bitmap.add(static_cast<Value>(value)); }
+
+    UInt64 size() const { return roaring_bitmap.cardinality(); }
+
+    void merge(const KeRoaringBitmapData & r1) { roaring_bitmap |= 
r1.roaring_bitmap; }
+
+    void read(DB::ReadBuffer & in)
+    {
+        static thread_local String buf;
+        size_t size;
+        readVarUInt(size, in);
+
+        static constexpr size_t max_size = 100_GiB;
+
+        if (size == 0)
+            throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect size (0) in 
groupBitmap.");
+        if (size > max_size)
+            throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array 
size in groupBitmap (maximum: {})", max_size);
+
+        if (in.available() > size)
+        {
+            roaring_bitmap = RoaringBitmap::readSafe(in.position(), size);
+            in.ignore(size);
+        }
+        else
+        {
+            buf.reserve(size);
+            in.readStrict(buf.data(), size);
+            roaring_bitmap = RoaringBitmap::readSafe(buf.data(), size);
+        }
+    }
+
+    void write(DB::WriteBuffer & out) const
+    {
+        auto size = roaring_bitmap.getSizeInBytes();
+        writeVarUInt(size, out);
+        if (out.available() > size)
+        {
+            roaring_bitmap.write(out.position());
+            out.position() += size;
+        }
+        else
+        {
+            std::unique_ptr<char[]> buf(new char[size]);
+            roaring_bitmap.write(buf.get());
+            out.write(buf.get(), size);
+        }
+    }
+
+    void to_ke_bitmap_data(DB::WriteBuffer & ke_bitmap_data_buffer) const
+    {
+        auto size = roaring_bitmap.getSizeInBytes();
+
+        std::unique_ptr<char[]> buf(new char[size]);
+        roaring_bitmap.write(buf.get());
+
+        Int8 signedLongs = 0;
+        writeBinary(signedLongs, ke_bitmap_data_buffer);
+
+        writeBinary(static_cast<unsigned char>(buf.get()[3]), 
ke_bitmap_data_buffer);
+        writeBinary(static_cast<unsigned char>(buf.get()[2]), 
ke_bitmap_data_buffer);
+        writeBinary(static_cast<unsigned char>(buf.get()[1]), 
ke_bitmap_data_buffer);
+        writeBinary(static_cast<unsigned char>(buf.get()[0]), 
ke_bitmap_data_buffer);
+
+        auto bitmap_data = buf.get() + 8;
+        ke_bitmap_data_buffer.write(bitmap_data, size - 8);
+    }
+
+    /**
+     * Computes the intersection between two bitmaps
+     */
+    void rb_and(const KeRoaringBitmapData & r1) /// NOLINT
+    {
+        roaring_bitmap &= r1.roaring_bitmap;
+    }
+
+    /**
+     * Computes the union between two bitmaps.
+     */
+    void rb_or(const KeRoaringBitmapData & r1)
+    {
+        merge(r1); /// NOLINT
+    }
+
+    /**
+     * Computes the symmetric difference (xor) between two bitmaps.
+     */
+    void rb_xor(const KeRoaringBitmapData & r1) /// NOLINT
+    {
+        roaring_bitmap ^= r1.roaring_bitmap;
+    }
+
+    /**
+     * Computes the difference (andnot) between two bitmaps
+     */
+    void rb_andnot(const KeRoaringBitmapData & r1) /// NOLINT
+    {
+        roaring_bitmap -= r1.roaring_bitmap;
+    }
+
+    /**
+     * Computes the cardinality of the intersection between two bitmaps.
+     */
+    UInt64 rb_and_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT
+    {
+        return (roaring_bitmap & r1.roaring_bitmap).cardinality();
+    }
+
+    /**
+     * Computes the cardinality of the union between two bitmaps.
+     */
+    UInt64 rb_or_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT
+    {
+        UInt64 c1 = size();
+        UInt64 c2 = r1.size();
+        UInt64 inter = rb_and_cardinality(r1);
+        return c1 + c2 - inter;
+    }
+
+    /**
+     * Computes the cardinality of the symmetric difference (andnot) between 
two bitmaps.
+     */
+    UInt64 rb_xor_cardinality(const KeRoaringBitmapData & r1) const /// NOLINT
+    {
+        UInt64 c1 = size();
+        UInt64 c2 = r1.size();
+        UInt64 inter = rb_and_cardinality(r1);
+        return c1 + c2 - 2 * inter;
+    }
+
+    /**
+     * Computes the cardinality of the difference (andnot) between two bitmaps.
+     */
+    UInt64 rb_andnot_cardinality(const KeRoaringBitmapData & r1) const /// 
NOLINT
+    {
+        UInt64 c1 = size();
+        UInt64 inter = rb_and_cardinality(r1);
+        return c1 - inter;
+    }
+
+    /**
+     * Return 1 if the two bitmaps contain the same elements.
+     */
+    UInt8 rb_equals(const KeRoaringBitmapData & r1) /// NOLINT
+    {
+        return roaring_bitmap == r1.roaring_bitmap;
+    }
+
+    /**
+     * Check whether two bitmaps intersect.
+     * Intersection with an empty set is always 0 (consistent with hasAny).
+     */
+    UInt8 rb_intersect(const KeRoaringBitmapData & r1) const /// NOLINT
+    {
+        if ((roaring_bitmap & r1.roaring_bitmap).cardinality() > 0)
+            return 1;
+        return 0;
+    }
+
+    /**
+     * Check whether the argument is the subset of this set.
+     * Empty set is a subset of any other set (consistent with hasAll).
+     * It's used in subset and currently only support comparing same type
+     */
+    UInt8 rb_is_subset(const KeRoaringBitmapData & r1) const /// NOLINT
+    {
+        if (!r1.roaring_bitmap.isSubset(roaring_bitmap))
+            return 0;
+        return 1;
+    }
+
+    /**
+     * Check whether this bitmap contains the argument.
+     */
+    UInt8 rb_contains(UInt64 x) const /// NOLINT
+    {
+        if (!std::is_same_v<T, UInt64> && x > rb_max())
+            return 0;
+
+        UInt32 high_bytes = uint32_t(x >> 32);
+        UInt32 high_bytes_new
+            = ((high_bytes >> 24)) | ((high_bytes >> 8) & 0xFF00) | 
((high_bytes << 8) & 0xFF0000) | ((high_bytes << 24));
+        UInt64 value = (uint64_t(high_bytes_new) << 32) | 
uint64_t(uint32_t(x));
+
+        return roaring_bitmap.contains(value);
+    }
+
+    /**
+     * Convert elements to integer array, return number of elements
+     */
+    template <typename Element>
+    UInt64 rb_to_array(PaddedPODArray<Element> & res) const /// NOLINT
+    {
+        UInt64 count = 0;
+        for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); 
++it)
+        {
+            // reverse high 4 bytes to Little-endian
+            Int64 original_value = *it;
+            UInt32 high_bytes = uint32_t(original_value >> 32);
+            UInt32 high_bytes_new
+                = ((high_bytes >> 24)) | ((high_bytes >> 8) & 0xFF00) | 
((high_bytes << 8) & 0xFF0000) | ((high_bytes << 24));
+            Int64 value = (uint64_t(high_bytes_new) << 32) | 
uint64_t(uint32_t(original_value));
+            res.emplace_back(value);
+            ++count;
+        }
+        return count;
+    }
+
+    /**
+     * Return new set with specified range (not include the range_end)
+     * It's used in subset and currently only support UInt32
+     */
+    UInt64 rb_range(UInt64 range_start, UInt64 range_end, KeRoaringBitmapData 
& r1) const /// NOLINT
+    {
+        UInt64 count = 0;
+        if (range_start >= range_end)
+            return count;
+
+        for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); 
++it)
+        {
+            if (*it < range_start)
+                continue;
+
+            if (*it < range_end)
+            {
+                r1.add(*it);
+                ++count;
+            }
+            else
+                break;
+        }
+        return count;
+    }
+
+    /**
+     * Return new set of the smallest `limit` values in set which is no less 
than `range_start`.
+     * It's used in subset and currently only support UInt32
+     */
+    UInt64 rb_limit(UInt64 range_start, UInt64 limit, KeRoaringBitmapData & 
r1) const /// NOLINT
+    {
+        if (limit == 0)
+            return 0;
+
+        UInt64 count = 0;
+        for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); 
++it)
+        {
+            if (*it < range_start)
+                continue;
+
+            if (count < limit)
+            {
+                r1.add(*it);
+                ++count;
+            }
+            else
+                break;
+        }
+        return count;
+    }
+
+    UInt64 rb_offset_limit(UInt64 offset, UInt64 limit, KeRoaringBitmapData & 
r1) const /// NOLINT
+    {
+        if (limit == 0 || offset >= size())
+            return 0;
+
+        UInt64 count = 0;
+        UInt64 offset_count = 0;
+        auto it = roaring_bitmap.begin();
+        for (; it != roaring_bitmap.end() && offset_count < offset; ++it)
+            ++offset_count;
+
+        for (; it != roaring_bitmap.end() && count < limit; ++it, ++count)
+            r1.add(*it);
+        return count;
+    }
+
+    UInt64 rb_min() const /// NOLINT
+    {
+        return roaring_bitmap.minimum();
+    }
+
+    UInt64 rb_max() const /// NOLINT
+    {
+        return roaring_bitmap.maximum();
+    }
+
+    /**
+     * Replace value.
+     * It's used in transform and currently can only support UInt32
+     */
+    void rb_replace(const UInt64 * from_vals, const UInt64 * to_vals, size_t 
num) /// NOLINT
+    {
+        for (size_t i = 0; i < num; ++i)
+        {
+            if (from_vals[i] == to_vals[i])
+                continue;
+            bool changed = 
roaring_bitmap.removeChecked(static_cast<Value>(from_vals[i]));
+            if (changed)
+                roaring_bitmap.add(static_cast<Value>(to_vals[i]));
+        }
+    }
+};
+
+template <typename T>
+struct KeAggregateBitmapData
+{
+    // If false, all bitmap operations will be treated as merge to initialize 
the state
+    bool init = false;
+    KeRoaringBitmapData<T> roaring_bitmap;
+    static const char * name() { return "keBitmap"; }
+};
+
+}
diff --git 
a/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.cpp 
b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.cpp
new file mode 100644
index 0000000000..2747895af2
--- /dev/null
+++ b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.cpp
@@ -0,0 +1,122 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <AggregateFunctions/AggregateFunctionFactory.h>
+#include <AggregateFunctions/FactoryHelpers.h>
+#include <DataTypes/DataTypeAggregateFunction.h>
+
+#include <AggregateFunctions/KeAggregateBitmapFunctions.h>
+
+
+namespace DB
+{
+struct Settings;
+
+namespace ErrorCodes
+{
+    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
+}
+}
+
+namespace local_engine
+{
+using namespace DB;
+
+AggregateFunctionPtr
+createKeAggregateBitmapOrCardinalityFunction(const std::string & name, const 
DataTypes & argument_types, const Array & parameters, const Settings *)
+{
+    assertNoParameters(name, parameters);
+    assertUnary(name, argument_types);
+    DataTypePtr argument_type_ptr = argument_types[0];
+    AggregateFunctionPtr res(
+        new local_engine::KeAggregateBitmapOrCardinality<Int64, 
local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr));
+
+    if (!res)
+        throw Exception(
+            ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
+            "Illegal type {} of argument for aggregate function {}",
+            argument_types[0]->getName(),
+            name);
+
+    return res;
+}
+
+AggregateFunctionPtr
+createKeAggregateBitmapOrDataFunction(const std::string & name, const 
DataTypes & argument_types, const Array & parameters, const Settings *)
+{
+    assertNoParameters(name, parameters);
+    assertUnary(name, argument_types);
+    DataTypePtr argument_type_ptr = argument_types[0];
+    AggregateFunctionPtr res(
+        new local_engine::KeAggregateBitmapOr<std::string, 
local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr));
+
+    if (!res)
+        throw Exception(
+            ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
+            "Illegal type {} of argument for aggregate function {}",
+            argument_types[0]->getName(),
+            name);
+
+    return res;
+}
+
+AggregateFunctionPtr createKeAggregateBitmapAndValueFunction(
+    const std::string & name, const DataTypes & argument_types, const Array & 
parameters, const Settings *)
+{
+    assertNoParameters(name, parameters);
+    assertUnary(name, argument_types);
+    DataTypePtr argument_type_ptr = argument_types[0];
+    AggregateFunctionPtr res(
+        new local_engine::KeAggregateBitmapAndValue<Int64, 
local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr));
+
+    if (!res)
+        throw Exception(
+            ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
+            "Illegal type {} of argument for aggregate function {}",
+            argument_types[0]->getName(),
+            name);
+
+    return res;
+}
+
+AggregateFunctionPtr createKeAggregateBitmapAndIdsFunction(
+    const std::string & name, const DataTypes & argument_types, const Array & 
parameters, const Settings *)
+{
+    assertNoParameters(name, parameters);
+    assertUnary(name, argument_types);
+    DataTypePtr argument_type_ptr = argument_types[0];
+    AggregateFunctionPtr res(
+        new local_engine::KeAggregateBitmapAndIds<Int64, 
local_engine::KeAggregateBitmapData<Int64>>(argument_type_ptr));
+
+    if (!res)
+        throw Exception(
+            ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
+            "Illegal type {} of argument for aggregate function {}",
+            argument_types[0]->getName(),
+            name);
+
+    return res;
+}
+
+void registerKeAggregateFunctionsBitmap(AggregateFunctionFactory & factory)
+{
+    AggregateFunctionProperties properties = { .returns_default_when_only_null 
= true };
+    factory.registerFunction("ke_bitmap_or_cardinality", 
{createKeAggregateBitmapOrCardinalityFunction, properties}, 
AggregateFunctionFactory::CaseInsensitive);
+    factory.registerFunction("ke_bitmap_or_data", 
{createKeAggregateBitmapOrDataFunction, properties}, 
AggregateFunctionFactory::CaseInsensitive);
+    factory.registerFunction("ke_bitmap_and_value", { 
createKeAggregateBitmapAndValueFunction, properties}, 
AggregateFunctionFactory::CaseInsensitive);
+    factory.registerFunction("ke_bitmap_and_ids", 
{createKeAggregateBitmapAndIdsFunction, properties}, 
AggregateFunctionFactory::CaseInsensitive);
+}
+}
diff --git 
a/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.h 
b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.h
new file mode 100644
index 0000000000..f475cf174e
--- /dev/null
+++ b/cpp-ch/local-engine/AggregateFunctions/KeAggregateBitmapFunctions.h
@@ -0,0 +1,438 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#pragma once
+
+#include <AggregateFunctions/AggregateFunctionFactory.h>
+#include <AggregateFunctions/IAggregateFunction.h>
+#include <AggregateFunctions/Combinators/AggregateFunctionNull.h>
+#include <Columns/ColumnAggregateFunction.h>
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnVector.h>
+#include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeString.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <Storages/ReadBufferFromJavaBitmap.h>
+#include <AggregateFunctions/KeAggregateBitmapData.h>
+#include <Common/assert_cast.h>
+
+namespace local_engine
+{
+
+using namespace DB;
+
+// For handle null values
+template <bool result_is_nullable, bool serialize_flag>
+class SparkAggregateBitmapNullUnary final
+    : public AggregateFunctionNullBase<result_is_nullable, serialize_flag,
+                                       
SparkAggregateBitmapNullUnary<result_is_nullable, serialize_flag>>
+{
+public:
+    SparkAggregateBitmapNullUnary(AggregateFunctionPtr nested_function_, const 
DataTypes & arguments, const Array & params)
+        : AggregateFunctionNullBase<result_is_nullable, serialize_flag,
+                                    
SparkAggregateBitmapNullUnary<result_is_nullable, 
serialize_flag>>(std::move(nested_function_), arguments, params)
+    {
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn ** columns, 
size_t row_num, Arena * arena) const override
+    {
+        const ColumnNullable * column = assert_cast<const ColumnNullable 
*>(columns[0]);
+        if (column->isNullAt(row_num)) {
+            return;
+        }
+        const IColumn * nested_column = &column->getNestedColumn();
+        this->nested_function->add(place, &nested_column, row_num, arena);
+    }
+};
+
+template <typename T, typename Data>
+class KeAggregateBitmapOrCardinality final : public 
IAggregateFunctionDataHelper<Data, KeAggregateBitmapOrCardinality<T, Data>>
+{
+public:
+    explicit KeAggregateBitmapOrCardinality(const DB::DataTypePtr & type)
+        : IAggregateFunctionDataHelper<Data, KeAggregateBitmapOrCardinality<T, 
Data>>({type}, {}, createResultType())
+    {
+    }
+
+    ~KeAggregateBitmapOrCardinality() override
+    {
+    }
+
+    String getName() const override { return "ke_bitmap_or_cardinality"; }
+
+    static DataTypePtr createResultType() { return 
std::make_shared<DataTypeNumber<T>>(); }
+
+    bool allocatesMemoryInArena() const override { return false; }
+
+    AggregateFunctionPtr getOwnNullAdapter(
+        const AggregateFunctionPtr & nested_function, const DataTypes & types, 
const Array & params, const AggregateFunctionProperties & /* properties */) 
const override
+    {
+        return std::make_shared<SparkAggregateBitmapNullUnary<false, 
false>>(nested_function, types, params);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override
+    {
+        new (place) Data;
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn ** columns, 
size_t row_num, Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        auto bitmap_data = assert_cast<const ColumnString 
&>(*columns[0]).getDataAt(row_num);
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+            // Null data will be skip in AggregateFunctionNullUnary.add()
+            if (!bitmap_data.empty() && bitmap_data.data)
+            {
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_lhs.roaring_bitmap.read(*charBuff);
+            }
+        }
+        else
+        {
+            if (!bitmap_data.empty())
+            {
+                auto data_rhs = std::make_unique<Data>();
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_rhs->roaring_bitmap.read(*charBuff);
+                data_lhs.roaring_bitmap.merge(data_rhs->roaring_bitmap);
+            }
+        }
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, 
Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        const Data & data_rhs = this->data(rhs);
+
+        if (!data_rhs.init)
+            return;
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+        }
+        data_lhs.roaring_bitmap.merge(data_rhs.roaring_bitmap);
+    }
+
+    bool isVersioned() const override { return false; }
+
+    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, 
std::optional<size_t> /* version */) const override
+    {
+        this->data(place).roaring_bitmap.write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, 
std::optional<size_t> /* version */, Arena *) const override
+    {
+        this->data(place).init = true;
+        this->data(place).roaring_bitmap.read(buf);
+    }
+
+    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, 
Arena *) const override
+    {
+        assert_cast<ColumnVector<T> 
&>(to).getData().push_back(static_cast<T>(this->data(place).roaring_bitmap.size()));
+    }
+};
+
+template <typename T, typename Data>
+class KeAggregateBitmapOr final : public IAggregateFunctionDataHelper<Data, 
KeAggregateBitmapOr<T, Data>>
+{
+public:
+    explicit KeAggregateBitmapOr(const DB::DataTypePtr & type)
+        : IAggregateFunctionDataHelper<Data, KeAggregateBitmapOr<T, 
Data>>({type}, {}, createResultType())
+    {
+    }
+
+    String getName() const override { return "ke_bitmap_or_data"; }
+
+    static DataTypePtr createResultType() { return 
std::make_shared<DataTypeString>(); }
+
+    bool allocatesMemoryInArena() const override { return false; }
+
+    AggregateFunctionPtr getOwnNullAdapter(
+        const AggregateFunctionPtr & nested_function, const DataTypes & types, 
const Array & params, const AggregateFunctionProperties & /*properties*/) const 
override
+    {
+        return std::make_shared<SparkAggregateBitmapNullUnary<false, 
false>>(nested_function, types, params);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override
+    {
+        new (place) Data;
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn ** columns, 
size_t row_num, Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        auto bitmap_data = assert_cast<const ColumnString 
&>(*columns[0]).getDataAt(row_num);
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+            // Null data will be skip in AggregateFunctionNullUnary.add()
+            if (!bitmap_data.empty())
+            {
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_lhs.roaring_bitmap.read(*charBuff);
+            }
+        }
+        else
+        {
+            if (!bitmap_data.empty())
+            {
+                auto data_rhs = std::make_unique<Data>();
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_rhs->roaring_bitmap.read(*charBuff);
+                data_lhs.roaring_bitmap.rb_or(data_rhs->roaring_bitmap);
+            }
+        }
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, 
Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        const Data & data_rhs = this->data(rhs);
+
+        if (!data_rhs.init)
+            return;
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+        }
+        data_lhs.roaring_bitmap.rb_or(data_rhs.roaring_bitmap);
+    }
+
+    bool isVersioned() const override { return false; }
+
+    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, 
std::optional<size_t> /* version */) const override
+    {
+        this->data(place).roaring_bitmap.write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, 
std::optional<size_t> /* version */, Arena *) const override
+    {
+        this->data(place).init = true;
+        this->data(place).roaring_bitmap.read(buf);
+    }
+
+    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, 
Arena *) const override
+    {
+        std::string str;
+        auto write_buffer = 
std::make_unique<WriteBufferFromVector<std::string>>(str);
+        this->data(place).roaring_bitmap.to_ke_bitmap_data(*write_buffer);
+        assert_cast<ColumnString &>(to).insert(str);
+    }
+};
+
+template <typename T, typename Data>
+class KeAggregateBitmapAndValue final : public 
IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndValue<T, Data>>
+{
+public:
+    explicit KeAggregateBitmapAndValue(const DB::DataTypePtr & type)
+        : IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndValue<T, 
Data>>({type}, {}, createResultType())
+    {
+    }
+
+    String getName() const override { return "ke_bitmap_and_value"; }
+
+    static DataTypePtr createResultType() { return 
std::make_shared<DataTypeNumber<T>>(); }
+
+    bool allocatesMemoryInArena() const override { return false; }
+
+    AggregateFunctionPtr getOwnNullAdapter(
+        const AggregateFunctionPtr & nested_function, const DataTypes & types, 
const Array & params, const AggregateFunctionProperties & /*properties*/) const 
override
+    {
+        return std::make_shared<SparkAggregateBitmapNullUnary<false, 
false>>(nested_function, types, params);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override
+    {
+        new (place) Data;
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn ** columns, 
size_t row_num, Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        auto bitmap_data = assert_cast<const ColumnString 
&>(*columns[0]).getDataAt(row_num);
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+            // Null data will be skip in AggregateFunctionNullUnary.add()
+            if (!bitmap_data.empty())
+            {
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_lhs.roaring_bitmap.read(*charBuff);
+            }
+        }
+        else
+        {
+            auto data_rhs = std::make_unique<Data>();
+            if (!bitmap_data.empty())
+            {
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_rhs->roaring_bitmap.read(*charBuff);
+            }
+            data_lhs.roaring_bitmap.rb_and(data_rhs->roaring_bitmap);
+        }
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, 
Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        const Data & data_rhs = this->data(rhs);
+
+        if (!data_rhs.init)
+            return;
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+            data_lhs.roaring_bitmap.merge(data_rhs.roaring_bitmap);
+        }
+        else
+        {
+            data_lhs.roaring_bitmap.rb_and(data_rhs.roaring_bitmap);
+        }
+    }
+
+    bool isVersioned() const override { return false; }
+
+    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, 
std::optional<size_t> /* version */) const override
+    {
+        this->data(place).roaring_bitmap.write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, 
std::optional<size_t> /* version */, Arena *) const override
+    {
+        this->data(place).init = true;
+        this->data(place).roaring_bitmap.read(buf);
+    }
+
+    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, 
Arena *) const override
+    {
+        assert_cast<ColumnVector<T> 
&>(to).getData().push_back(static_cast<T>(this->data(place).roaring_bitmap.size()));
+    }
+};
+
+template <typename T, typename Data>
+class KeAggregateBitmapAndIds final : public 
IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndIds<T, Data>>
+{
+public:
+    explicit KeAggregateBitmapAndIds(const DB::DataTypePtr & type)
+        : IAggregateFunctionDataHelper<Data, KeAggregateBitmapAndIds<T, 
Data>>({type}, {}, createResultType())
+    {
+    }
+
+    String getName() const override { return "ke_bitmap_and_ids"; }
+
+    static DataTypePtr createResultType() { return 
std::make_shared<DataTypeArray>(std::make_shared<DataTypeNumber<T>>()); }
+
+    bool allocatesMemoryInArena() const override { return false; }
+
+    AggregateFunctionPtr getOwnNullAdapter(
+        const AggregateFunctionPtr & nested_function, const DataTypes & types, 
const Array & params, const AggregateFunctionProperties & /*properties*/) const 
override
+    {
+        return std::make_shared<SparkAggregateBitmapNullUnary<false, 
false>>(nested_function, types, params);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override
+    {
+        new (place) Data;
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn ** columns, 
size_t row_num, Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        auto bitmap_data = assert_cast<const ColumnString 
&>(*columns[0]).getDataAt(row_num);
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+            // Null data will be skip in AggregateFunctionNullUnary.add()
+            if (!bitmap_data.empty())
+            {
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_lhs.roaring_bitmap.read(*charBuff);
+            }
+        }
+        else
+        {
+            auto data_rhs = std::make_unique<Data>();
+            if (!bitmap_data.empty())
+            {
+                auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char 
*>(bitmap_data.data), bitmap_data.size);
+                data_rhs->roaring_bitmap.read(*charBuff);
+            }
+            data_lhs.roaring_bitmap.rb_and(data_rhs->roaring_bitmap);
+        }
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, 
Arena *) const override
+    {
+        Data & data_lhs = this->data(place);
+        const Data & data_rhs = this->data(rhs);
+
+        if (!data_rhs.init)
+            return;
+
+        if (!data_lhs.init)
+        {
+            data_lhs.init = true;
+            data_lhs.roaring_bitmap.merge(data_rhs.roaring_bitmap);
+        }
+        else
+        {
+            data_lhs.roaring_bitmap.rb_and(data_rhs.roaring_bitmap);
+        }
+    }
+
+    bool isVersioned() const override { return false; }
+
+    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, 
std::optional<size_t> /* version */) const override
+    {
+        this->data(place).roaring_bitmap.write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, 
std::optional<size_t> /* version */, Arena *) const override
+    {
+        this->data(place).init = true;
+        this->data(place).roaring_bitmap.read(buf);
+    }
+
+    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, 
Arena *) const override
+    {
+        UInt64 cardinality = this->data(place).roaring_bitmap.size();
+
+        if (cardinality > 10000000)
+        {
+            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "There are too 
many returned data ({}) for the bitmap.", cardinality);
+        }
+        else
+        {
+            auto & arr_to = assert_cast<ColumnArray &>(to);
+            ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
+            PaddedPODArray<T> & res_data = typeid_cast<ColumnVector<T> 
&>(arr_to.getData()).getData();
+            UInt64 count = 
this->data(place).roaring_bitmap.rb_to_array(res_data);
+            offsets_to.push_back(offsets_to.back() + count);
+        }
+    }
+};
+
+void registerKeAggregateFunctionsBitmap(AggregateFunctionFactory & factory);
+}
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 84eb55f379..8187e858ec 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -22,6 +22,7 @@
 #include <optional>
 #include <unistd.h>
 #include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
+#include <AggregateFunctions/KeAggregateBitmapFunctions.h>
 #include <AggregateFunctions/registerAggregateFunctions.h>
 #include <Columns/ColumnArray.h>
 #include <Columns/ColumnConst.h>
@@ -881,6 +882,7 @@ void registerAllFunctions()
     registerAggregateFunctionsBloomFilter(agg_factory);
     registerAggregateFunctionSparkAvg(agg_factory);
     registerAggregateFunctionRowNumGroup(agg_factory);
+    registerKeAggregateFunctionsBitmap(agg_factory);
     DB::registerAggregateFunctionUniqHyperLogLogPlusPlus(agg_factory);
     registerAggregateFunctionDVRoaringBitmap(agg_factory);
 
diff --git a/cpp-ch/local-engine/Functions/CMakeLists.txt 
b/cpp-ch/local-engine/Functions/CMakeLists.txt
index 9e31595e03..f0371c4db5 100644
--- a/cpp-ch/local-engine/Functions/CMakeLists.txt
+++ b/cpp-ch/local-engine/Functions/CMakeLists.txt
@@ -41,6 +41,7 @@ list(
   ch_contrib::cityhash
   ch_contrib::farmhash
   ch_contrib::xxHash
+  ch_contrib::roaring
   OpenSSL::SSL)
 
 if(TARGET ch_contrib::vectorscan)
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.cpp
new file mode 100644
index 0000000000..fe9a3aa970
--- /dev/null
+++ b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SparkFunctionBitmapCardinality.h"
+#include <string>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <Functions/FunctionFactory.h>
+#include <Functions/FunctionHelpers.h>
+#include <Common/Exception.h>
+#include "AggregateFunctions/KeAggregateBitmapData.h"
+
+namespace DB
+{
+namespace ErrorCodes
+{
+    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
+    extern const int NOT_IMPLEMENTED;
+    extern const int ILLEGAL_COLUMN;
+    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+}
+}
+
+namespace local_engine
+{
+using namespace DB;
+DB::DataTypePtr SparkFunctionBitmapCardinality::getReturnTypeImpl(const 
DB::ColumnsWithTypeAndName & arguments) const
+{
+    if (arguments.size() != 1)
+    {
+        throw DB::Exception(
+            DB::ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} 
requires 1 arguments, passed {}", getName(), arguments.size());
+    }
+
+    if (!DB::WhichDataType(arguments[0].type).isString())
+    {
+        throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "All arguments for 
function {} must be String", getName());
+    }
+    return std::make_shared<DataTypeNumber<UInt64>>();
+}
+
+DB::ColumnPtr SparkFunctionBitmapCardinality::executeImpl(
+    const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr & 
result_type, const size_t input_rows_count) const
+{
+    auto x = arguments[0].column;
+
+    auto res = result_type->createColumn();
+    res->reserve(input_rows_count);
+
+    for (size_t i = 0; i < input_rows_count; ++i)
+    {
+        auto data_str = x->getDataAt(i);
+        if (data_str.empty())
+        {
+            res->insertDefault();
+        }
+        else
+        {
+            auto charBuff = 
std::make_unique<ReadBufferFromJavaBitmap>(const_cast<char *>(data_str.data), 
data_str.size);
+            auto bitmap = std::make_unique<KeRoaringBitmapData<Int64>>();
+            bitmap->read(*charBuff);
+            res->insert(bitmap->size());
+        }
+    }
+    return res;
+}
+
+REGISTER_FUNCTION(SparkFunctionBitmapCardinality)
+{
+    factory.registerFunction<SparkFunctionBitmapCardinality>();
+}
+}
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.h 
b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.h
new file mode 100644
index 0000000000..ba02c82b39
--- /dev/null
+++ b/cpp-ch/local-engine/Functions/SparkFunctionBitmapCardinality.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Columns/IColumn.h>
+#include <Core/ColumnsWithTypeAndName.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/IDataType.h>
+#include <Functions/IFunction.h>
+#include <Interpreters/Context.h>
+
+namespace local_engine
+{
+class SparkFunctionBitmapCardinality : public DB::IFunction
+{
+public:
+    static constexpr auto name = "keBitmapCardinality";
+    static DB::FunctionPtr create(DB::ContextPtr) { return 
std::make_shared<SparkFunctionBitmapCardinality>(); }
+    SparkFunctionBitmapCardinality() = default;
+    ~SparkFunctionBitmapCardinality() override = default;
+
+    String getName() const override { return name; }
+
+    size_t getNumberOfArguments() const override { return 1; }
+    bool isVariadic() const override { return true; }
+    bool isSuitableForShortCircuitArgumentsExecution(const 
DB::DataTypesWithConstInfo & /*arguments*/) const override { return true; }
+
+    DB::DataTypePtr getReturnTypeImpl(const DB::ColumnsWithTypeAndName & 
arguments) const override;
+
+    DB::ColumnPtr executeImpl(
+        const DB::ColumnsWithTypeAndName & arguments,
+        const DB::DataTypePtr & result_type,
+        const size_t /*input_rows_count*/) const override;
+
+    bool useDefaultImplementationForConstants() const override { return true; }
+};
+}
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp
 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp
index ca448edb57..2369d6cdd7 100644
--- 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp
+++ 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.cpp
@@ -20,6 +20,10 @@
 
 namespace local_engine
 {
+REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapOrCardinality, 
ke_bitmap_or_cardinality, ke_bitmap_or_cardinality)
+REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapOrData, ke_bitmap_or_data, 
ke_bitmap_or_data)
+REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapAndValue, 
ke_bitmap_and_value, ke_bitmap_and_value)
+REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(KeBitmapAndIds, ke_bitmap_and_ids, 
ke_bitmap_and_ids)
 
 REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(Sum, sum, sum)
 REGISTER_COMMON_AGGREGATE_FUNCTION_PARSER(Avg, avg, avg)
diff --git a/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.cpp 
b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.cpp
new file mode 100644
index 0000000000..1bc532fee1
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.cpp
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ReadBufferFromJavaBitmap.h"
+#include <optional>
+#include <IO/ReadBuffer.h>
+#include <IO/WriteBufferFromString.h>
+#include <IO/WriteHelpers.h>
+
+namespace local_engine
+{
+
+ReadBufferFromJavaBitmap::ReadBufferFromJavaBitmap(char * original_ptr, size_t 
original_data_size) : DB::ReadBuffer(original_ptr, 0)
+{
+    original_size = original_data_size;
+    ch_bitmap_data_buffer = std::make_unique<WriteBufferFromOwnString>();
+    writeVarUInt(original_size + 3, *ch_bitmap_data_buffer);
+    Int64 map_size = 0;
+    map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[1]);
+    map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[2]);
+    map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[3]);
+    map_size += map_size << 8 | static_cast<unsigned char>(original_ptr[4]);
+    writeBinary(map_size, *ch_bitmap_data_buffer);
+
+    BufferBase::set(ch_bitmap_data_buffer->str().data(), 
ch_bitmap_data_buffer->str().size(), 0);
+
+    java_bitmap_data = original_ptr + 5;
+}
+
+bool ReadBufferFromJavaBitmap::nextImpl()
+{
+    if (!read_header_finished)
+    {
+        read_header_finished = true;
+        BufferBase::set(java_bitmap_data, original_size - 5, 0);
+        return true;
+    }
+    return false;
+}
+}
diff --git a/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.h 
b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.h
new file mode 100644
index 0000000000..fc06f65b88
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/ReadBufferFromJavaBitmap.h
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#pragma once
+
+#include <IO/ReadBuffer.h>
+#include <IO/WriteBufferFromString.h>
+
+namespace local_engine
+{
+using namespace DB;
+
+class ReadBufferFromJavaBitmap : public ReadBuffer
+{
+public:
+    ReadBufferFromJavaBitmap(char * original_ptr, size_t original_data_size);
+
+private:
+    size_t original_size = 0;
+    Position java_bitmap_data;
+    std::unique_ptr<WriteBufferFromOwnString> ch_bitmap_data_buffer;
+    bool read_header_finished = false;
+
+    bool nextImpl() override;
+};
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to