This is an automated email from the ASF dual-hosted git repository. jmalkin pushed a commit to branch ebpps in repository https://gitbox.apache.org/repos/asf/datasketches-python.git
commit c26aecd28c407abe0a5590030c349f7218d09bc7 Author: Jon Malkin <[email protected]> AuthorDate: Thu Oct 19 14:35:12 2023 -0700 ebpps sampling -- note the depdence on origin/master rather than a realease --- CMakeLists.txt | 3 +- src/datasketches.cpp | 2 + src/ebpps_wrapper.cpp | 108 +++++++++++++++++++++++++++++++++++++++++++++++ tests/ebpps_test.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++ tests/vo_test.py | 4 +- 5 files changed, 227 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 70a0f09..2f3a9c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -101,6 +101,7 @@ target_sources(python src/theta_wrapper.cpp src/tuple_wrapper.cpp src/vo_wrapper.cpp + src/ebpps_wrapper.cpp src/req_wrapper.cpp src/quantiles_wrapper.cpp src/density_wrapper.cpp @@ -114,7 +115,7 @@ cmake_policy(SET CMP0097 NEW) include(ExternalProject) ExternalProject_Add(datasketches GIT_REPOSITORY https://github.com/apache/datasketches-cpp.git - GIT_TAG 4.1.0 + GIT_TAG origin/master GIT_SHALLOW true GIT_SUBMODULES "" INSTALL_DIR /tmp/datasketches diff --git a/src/datasketches.cpp b/src/datasketches.cpp index e360452..c2873f6 100644 --- a/src/datasketches.cpp +++ b/src/datasketches.cpp @@ -29,6 +29,7 @@ void init_cpc(py::module& m); void init_theta(py::module& m); void init_tuple(py::module& m); void init_vo(py::module& m); +void init_ebpps(py::module& m); void init_req(py::module& m); void init_quantiles(py::module& m); void init_count_min(py::module& m); @@ -47,6 +48,7 @@ PYBIND11_MODULE(_datasketches, m) { init_theta(m); init_tuple(m); init_vo(m); + init_ebpps(m); init_req(m); init_quantiles(m); init_count_min(m); diff --git a/src/ebpps_wrapper.cpp b/src/ebpps_wrapper.cpp new file mode 100644 index 0000000..3ea8b6b --- /dev/null +++ b/src/ebpps_wrapper.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ebpps_sketch.hpp" +#include "py_serde.hpp" + +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +namespace py = pybind11; + +namespace datasketches { + +namespace python { + +template<typename T> +ebpps_sketch<T> ebpps_sketch_deserialize(py::bytes& skBytes, py_object_serde& sd) { + std::string skStr = skBytes; // implicit cast + return ebpps_sketch<T>::deserialize(skStr.c_str(), skStr.length(), sd); +} + +template<typename T> +py::object ebpps_sketch_serialize(const ebpps_sketch<T>& sk, py_object_serde& sd) { + auto serResult = sk.serialize(0, sd); + return py::bytes((char*)serResult.data(), serResult.size()); +} + +template<typename T> +std::string ebpps_sketch_to_string(const ebpps_sketch<T>& sk, bool print_items) { + if (print_items) { + std::ostringstream ss; + ss << sk.to_string(); + ss << "### EBPPS Sketch Items" << std::endl; + int i = 0; + for (auto item : sk) { + // item is an arbitrary py::object, so get the value by + // using internal str() method then casting to C++ std::string + py::str item_pystr(item); + std::string item_str = py::cast<std::string>(item_pystr); + ss << i++ << ": " << item_str << std::endl; + } + return ss.str(); + } else { + return sk.to_string(); + } +} + +} +} + +namespace dspy = datasketches::python; + +template<typename T> +void bind_ebpps_sketch(py::module &m, const char* name) { + using namespace datasketches; + + py::class_<ebpps_sketch<T>>(m, name) + .def(py::init<uint32_t>(), py::arg("k")) + .def("__str__", &dspy::ebpps_sketch_to_string<T>, py::arg("print_items")=false, + "Produces a string summary of the sketch") + .def("to_string", &dspy::ebpps_sketch_to_string<T>, py::arg("print_items")=false, + "Produces a string summary of the sketch") + .def("update", (void (ebpps_sketch<T>::*)(const T&, double)) &ebpps_sketch<T>::update, py::arg("item"), py::arg("weight")=1.0, + "Updates the sketch with the given value and weight") + .def("merge", (void (ebpps_sketch<T>::*)(const ebpps_sketch<T>&)) &ebpps_sketch<T>::merge, + py::arg("sketch"), "Merges the sketch with the given sketch") + .def_property_readonly("k", &ebpps_sketch<T>::get_k, + "Returns the sketch's maximum configured sample size") + .def_property_readonly("n", &ebpps_sketch<T>::get_n, + "Returns the total stream length") + .def_property_readonly("c", &ebpps_sketch<T>::get_c, + "Returns the expected number of samples returned upon a call to get_result() or the creation of an iterator. " + "The number is a floating point value, where the fractional portion represents the probability of including " + "a \"partial item\" from the sample. The value C should be no larger than the sketch's configured value of k, " + "although numerical precision limitations mean it may exceed k by double precision floating point error margins in certain cases.") + .def("get_samples", &ebpps_sketch<T>::get_result, + "Returns the set of samples in the sketch") + .def("is_empty", &ebpps_sketch<T>::is_empty, + "Returns True if the sketch is empty, otherwise False") + .def("get_serialized_size_bytes", + [](const ebpps_sketch<T>& sk, py_object_serde& sd) { return sk.get_serialized_size_bytes(sd); }, + py::arg("serde"), + "Computes the size in bytes needed to serialize the current sketch") + .def("serialize", &dspy::ebpps_sketch_serialize<T>, py::arg("serde"), "Serialize the var opt sketch using the provided serde") + .def_static("deserialize", &dspy::ebpps_sketch_deserialize<T>, py::arg("bytes"), py::arg("serde"), + "Constructs a var opt sketch from the given bytes using the provided serde") + .def("__iter__", [](const ebpps_sketch<T>& sk) { return py::make_iterator(sk.begin(), sk.end()); }); +} + +void init_ebpps(py::module &m) { + bind_ebpps_sketch<py::object>(m, "ebpps_sketch"); +} diff --git a/tests/ebpps_test.py b/tests/ebpps_test.py new file mode 100644 index 0000000..fc3d7ce --- /dev/null +++ b/tests/ebpps_test.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from math import floor, ceil +from datasketches import ebpps_sketch, PyIntsSerDe, PyStringsSerDe + +class EbppsTest(unittest.TestCase): + def test_ebpps_example(self): + k = 50 # a small value so we can easily fill the sketch + sk = ebpps_sketch(k) + + # ebpps sampling reduces to standard reservoir sampling + # if the items are all equally weighted, although the + # algorithm will be significantly slower than an optimized + # reservoir sampler + n = 5 * k + for i in range(0, n): + sk.update(i) + + # we should have k samples since they're equally weighted + self.assertAlmostEqual(k, sk.c, places=12) + + # we can also add a heavy item, using a negative value to + # be able to identify the item later. Keep in mind that + # "heavy" is a relative concept, so using a fixed + # multiple of n may not be considered a heavy item for + # larger values of n + sk.update(-1, 1000 * n) + self.assertEqual(k, sk.k) + self.assertLess(sk.c, k) + self.assertEqual(n + 1, sk.n) + self.assertFalse(sk.is_empty()) + + # we can easily get the list of items in the sample + items = sk.get_samples() + self.assertLessEqual(len(items), k) + + count = 0 + for items in sk: + count = count + 1 + self.assertTrue(count == floor(sk.c) or count == ceil(sk.c)) + + # next we'll create a second, smaller sketch with + # only heavier items relative to the previous sketch + k2 = 5 + sk2 = ebpps_sketch(k2) + # for weight, use the sum of all items >=0 from before (n) + wt = n + for i in range(0, k2 + 1): + sk2.update((2 * n) + i, wt) + + # now merge the sketches, noting how the + # resulting k will be the smaller of the two. + input_sk_c = sk.c + sk.merge(sk2) + + self.assertEqual(n + k2 + 2, sk.n) + self.assertFalse(sk.is_empty()) + self.assertEqual(sk.k, k2) + self.assertLess(sk.k, k) + + # the expected number of samples post-merge may be larger than + # with the input sketch + self.assertGreater(sk.c, input_sk_c) + + # we can dump a sumamry of information in the sketch + print(sk) + + # if we want to print the list of items, there must be a + # __str__() method for each item (which need not be the same + # type; they're all generic python objects when used from + # python), otherwise you may trigger an exception. + # to_string() is provided as a convenience to avoid direct + # calls to __str__() with parameters. + print(sk.to_string(True)) + + # finally, we can serialize the sketch by providing an + # appropriate serde class. + expected_size = sk.get_serialized_size_bytes(PyIntsSerDe()) + b = sk.serialize(PyIntsSerDe()) + self.assertEqual(expected_size, len(b)) + + # if we try to deserialize with the wrong serde, things break + try: + ebpps_sketch.deserialize(b, PyStringsSerDe()) + self.fail() + except: + # expected; do nothing + self.assertTrue(True) + + # using the correct serde gives us back a copy of the original + rebuilt = ebpps_sketch.deserialize(b, PyIntsSerDe()) + self.assertEqual(sk.k, rebuilt.k) + self.assertEqual(sk.c, rebuilt.c) + self.assertEqual(sk.n, rebuilt.n) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vo_test.py b/tests/vo_test.py index 4fbca41..1e80b01 100644 --- a/tests/vo_test.py +++ b/tests/vo_test.py @@ -31,8 +31,8 @@ class VoTest(unittest.TestCase): for i in range(0, n): vo.update(i) - # we can also add a heavy item, using a negative weight for - # easy filtering later. keep in mind that "heavy" is a + # we can also add a heavy item, using a negative value for + # easy filtering later. Keep in mind that "heavy" is a # relative concept, so using a fixed multiple of n may not # be considered a heavy item for larger values of n vo.update(-1, 1000 * n) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
