jmalkin commented on code in PR #332:
URL: https://github.com/apache/datasketches-cpp/pull/332#discussion_r1084489620


##########
common/include/memory_operations.hpp:
##########
@@ -54,14 +54,14 @@ static inline size_t copy_to_mem(const void* src, void* 
dst, size_t size) {
 }
 
 template<typename T>
-static inline size_t copy_to_mem(const T& item, void* dst) {
-  memcpy(dst, &item, sizeof(T));
+static inline size_t copy_from_mem(const void* src, T& item) {
+  memcpy(&item, src, sizeof(T));
   return sizeof(T);
 }
 
 template<typename T>
-static inline size_t copy_from_mem(const void* src, T& item) {
-  memcpy(&item, src, sizeof(T));
+static inline size_t copy_to_mem(T item, void* dst) {

Review Comment:
   Why are we no longer declaring T as const?



##########
theta/include/theta_sketch.hpp:
##########
@@ -355,6 +357,25 @@ class compact_theta_sketch_alloc: public 
theta_sketch_alloc<Allocator> {
    */
   vector_bytes serialize(unsigned header_size_bytes = 0) const;
 
+  /**
+   * This method serializes the sketch into a given stream in a compressed 
binary form.
+   * Compression is applied to ordered sketches except empty and single item.
+   * For unordered, empty and single item sketches this method is equivalent 
to serialize()
+   * @param os output stream
+   */
+  void serialize_compressed(std::ostream& os) const;

Review Comment:
   Is there a reason we don't order and then compress for unordered sketches? 
If explicitly saying to serialize compressed, that seems a reasonable thing to 
do.



##########
theta/include/theta_sketch_impl.hpp:
##########
@@ -377,156 +372,355 @@ auto compact_theta_sketch_alloc<A>::serialize(unsigned 
header_size_bytes) const
   vector_bytes bytes(size, 0, entries_.get_allocator());
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
-  ptr += copy_to_mem(preamble_longs, ptr);
-  const uint8_t serial_version = SERIAL_VERSION;
-  ptr += copy_to_mem(serial_version, ptr);
-  const uint8_t type = SKETCH_TYPE;
-  ptr += copy_to_mem(type, ptr);
+  *ptr++ = preamble_longs;
+  *ptr++ = UNCOMPRESSED_SERIAL_VERSION;
+  *ptr++ = SKETCH_TYPE;
   ptr += sizeof(uint16_t); // unused
   const uint8_t flags_byte(
     (1 << flags::IS_COMPACT) |
     (1 << flags::IS_READ_ONLY) |
     (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
     (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
   );
-  ptr += copy_to_mem(flags_byte, ptr);
-  const uint16_t seed_hash = get_seed_hash();
-  ptr += copy_to_mem(seed_hash, ptr);
+  *ptr++ = flags_byte;
+  ptr += copy_to_mem(get_seed_hash(), ptr);
   if (preamble_longs > 1) {
-    const uint32_t num_entries = static_cast<uint32_t>(entries_.size());
-    ptr += copy_to_mem(num_entries, ptr);
+    ptr += copy_to_mem<uint32_t>(entries_.size(), ptr);
     ptr += sizeof(uint32_t); // unused
   }
   if (this->is_estimation_mode()) ptr += copy_to_mem(theta_, ptr);
   if (entries_.size() > 0) ptr += copy_to_mem(entries_.data(), ptr, 
entries_.size() * sizeof(uint64_t));
   return bytes;
 }
 
+template<typename A>
+bool compact_theta_sketch_alloc<A>::is_suitable_for_compression() const {
+  if (!this->is_ordered() || entries_.size() == 0 ||
+        (entries_.size() == 1 && !this->is_estimation_mode())) return false;
+  return true;
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_compressed(std::ostream& os) 
const {
+  if (is_suitable_for_compression()) return serialize_version_4(os);
+  return serialize(os);
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned 
header_size_bytes) const -> vector_bytes {
+  if (is_suitable_for_compression()) return 
serialize_version_4(header_size_bytes);
+  return serialize(header_size_bytes);
+}
+
+template<typename A>
+uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
+  // compression is based on leading zeros in deltas between ordered hash 
values
+  // assumes ordered sketch
+  uint64_t previous = 0;
+  uint64_t ored = 0;
+  for (const uint64_t entry: entries_) {
+    const uint64_t delta = entry - previous;
+    ored |= delta;
+    previous = entry;
+  }
+  return count_leading_zeros_in_u64(ored);
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) 
const {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  const uint8_t min_entry_zeros = compute_min_leading_zeros();
+
+  // store num_entries as whole bytes since whole-byte blocks will follow 
(most probably)
+  const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - 
count_leading_zeros_in_u32(entries_.size()));
+
+  write(os, preamble_longs);
+  write(os, COMPRESSED_SERIAL_VERSION);
+  write(os, SKETCH_TYPE);
+  write(os, min_entry_zeros);
+  write(os, num_entries_bytes);
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (1 << flags::IS_ORDERED)
+  );
+  write(os, flags_byte);
+  write(os, get_seed_hash());
+  if (this->is_estimation_mode()) write(os, this->theta_);
+  uint32_t num_entries = entries_.size();
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    write<uint8_t>(os, num_entries & 0xff);
+    num_entries >>= 8;
+  }
+
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  uint64_t previous = 0;
+  uint64_t deltas[8];
+  vector_bytes buffer(entry_bits, 0, entries_.get_allocator()); // block of 8 
entries takes entry_bits bytes
+
+  unsigned i;

Review Comment:
   It might be useful to have high-level notes in comments here. I think the 
idea is that blocks of 8 entries ensures we'll be byte-aligned, so we do chunks 
of 8 and then have special handling for the remainder?



##########
theta/include/theta_sketch_impl.hpp:
##########
@@ -377,156 +372,355 @@ auto compact_theta_sketch_alloc<A>::serialize(unsigned 
header_size_bytes) const
   vector_bytes bytes(size, 0, entries_.get_allocator());
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
-  ptr += copy_to_mem(preamble_longs, ptr);
-  const uint8_t serial_version = SERIAL_VERSION;
-  ptr += copy_to_mem(serial_version, ptr);
-  const uint8_t type = SKETCH_TYPE;
-  ptr += copy_to_mem(type, ptr);
+  *ptr++ = preamble_longs;
+  *ptr++ = UNCOMPRESSED_SERIAL_VERSION;
+  *ptr++ = SKETCH_TYPE;
   ptr += sizeof(uint16_t); // unused
   const uint8_t flags_byte(
     (1 << flags::IS_COMPACT) |
     (1 << flags::IS_READ_ONLY) |
     (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
     (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
   );
-  ptr += copy_to_mem(flags_byte, ptr);
-  const uint16_t seed_hash = get_seed_hash();
-  ptr += copy_to_mem(seed_hash, ptr);
+  *ptr++ = flags_byte;
+  ptr += copy_to_mem(get_seed_hash(), ptr);

Review Comment:
   switching so that we mix *ptr++ and ptr += copy_to_mem() did not improve 
readability



##########
theta/include/theta_sketch_impl.hpp:
##########
@@ -377,156 +372,355 @@ auto compact_theta_sketch_alloc<A>::serialize(unsigned 
header_size_bytes) const
   vector_bytes bytes(size, 0, entries_.get_allocator());
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
-  ptr += copy_to_mem(preamble_longs, ptr);
-  const uint8_t serial_version = SERIAL_VERSION;
-  ptr += copy_to_mem(serial_version, ptr);
-  const uint8_t type = SKETCH_TYPE;
-  ptr += copy_to_mem(type, ptr);
+  *ptr++ = preamble_longs;
+  *ptr++ = UNCOMPRESSED_SERIAL_VERSION;
+  *ptr++ = SKETCH_TYPE;
   ptr += sizeof(uint16_t); // unused
   const uint8_t flags_byte(
     (1 << flags::IS_COMPACT) |
     (1 << flags::IS_READ_ONLY) |
     (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
     (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
   );
-  ptr += copy_to_mem(flags_byte, ptr);
-  const uint16_t seed_hash = get_seed_hash();
-  ptr += copy_to_mem(seed_hash, ptr);
+  *ptr++ = flags_byte;
+  ptr += copy_to_mem(get_seed_hash(), ptr);
   if (preamble_longs > 1) {
-    const uint32_t num_entries = static_cast<uint32_t>(entries_.size());
-    ptr += copy_to_mem(num_entries, ptr);
+    ptr += copy_to_mem<uint32_t>(entries_.size(), ptr);
     ptr += sizeof(uint32_t); // unused
   }
   if (this->is_estimation_mode()) ptr += copy_to_mem(theta_, ptr);
   if (entries_.size() > 0) ptr += copy_to_mem(entries_.data(), ptr, 
entries_.size() * sizeof(uint64_t));
   return bytes;
 }
 
+template<typename A>
+bool compact_theta_sketch_alloc<A>::is_suitable_for_compression() const {
+  if (!this->is_ordered() || entries_.size() == 0 ||
+        (entries_.size() == 1 && !this->is_estimation_mode())) return false;
+  return true;
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_compressed(std::ostream& os) 
const {
+  if (is_suitable_for_compression()) return serialize_version_4(os);
+  return serialize(os);
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned 
header_size_bytes) const -> vector_bytes {
+  if (is_suitable_for_compression()) return 
serialize_version_4(header_size_bytes);
+  return serialize(header_size_bytes);
+}
+
+template<typename A>
+uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
+  // compression is based on leading zeros in deltas between ordered hash 
values
+  // assumes ordered sketch
+  uint64_t previous = 0;
+  uint64_t ored = 0;
+  for (const uint64_t entry: entries_) {
+    const uint64_t delta = entry - previous;
+    ored |= delta;
+    previous = entry;
+  }
+  return count_leading_zeros_in_u64(ored);
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) 
const {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  const uint8_t min_entry_zeros = compute_min_leading_zeros();
+
+  // store num_entries as whole bytes since whole-byte blocks will follow 
(most probably)
+  const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - 
count_leading_zeros_in_u32(entries_.size()));
+
+  write(os, preamble_longs);
+  write(os, COMPRESSED_SERIAL_VERSION);
+  write(os, SKETCH_TYPE);
+  write(os, min_entry_zeros);
+  write(os, num_entries_bytes);
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (1 << flags::IS_ORDERED)
+  );
+  write(os, flags_byte);
+  write(os, get_seed_hash());
+  if (this->is_estimation_mode()) write(os, this->theta_);
+  uint32_t num_entries = entries_.size();
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    write<uint8_t>(os, num_entries & 0xff);
+    num_entries >>= 8;
+  }
+
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  uint64_t previous = 0;
+  uint64_t deltas[8];
+  vector_bytes buffer(entry_bits, 0, entries_.get_allocator()); // block of 8 
entries takes entry_bits bytes
+
+  unsigned i;
+  for (i = 0; i + 7 < entries_.size(); i += 8) {
+    for (unsigned j = 0; j < 8; ++j) {
+      deltas[j] = entries_[i + j] - previous;
+      previous = entries_[i + j];
+    }
+    pack_bits_block8(deltas, buffer.data(), entry_bits);
+    write(os, buffer.data(), buffer.size());
+  }
+
+  uint8_t offset = 0;
+  uint8_t* ptr = buffer.data();
+  for (; i < entries_.size(); ++i) {
+    const uint64_t delta = entries_[i] - previous;
+    previous = entries_[i];
+    offset = pack_bits(delta, entry_bits, ptr, offset);
+  }
+  write(os, buffer.data(), ptr - buffer.data());
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_version_4(unsigned 
header_size_bytes) const -> vector_bytes {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  const uint8_t min_entry_zeros = compute_min_leading_zeros();
+  const size_t compressed_bits = (64 - min_entry_zeros) * entries_.size();
+
+  // store num_entries as whole bytes since whole-byte blocks will follow 
(most probably)
+  const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - 
count_leading_zeros_in_u32(entries_.size()));
+
+  const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + 
num_entries_bytes
+      + whole_bytes_to_hold_bits(compressed_bits);
+  vector_bytes bytes(size, 0, entries_.get_allocator());
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+
+  *ptr++ = preamble_longs;
+  *ptr++ = COMPRESSED_SERIAL_VERSION;
+  *ptr++ = SKETCH_TYPE;
+  *ptr++ = min_entry_zeros;
+  *ptr++ = num_entries_bytes;
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (1 << flags::IS_ORDERED)
+  );
+  *ptr++ = flags_byte;
+  ptr += copy_to_mem(get_seed_hash(), ptr);
+  if (this->is_estimation_mode()) {
+    ptr += copy_to_mem(theta_, ptr);
+  }
+  uint32_t num_entries = entries_.size();
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    *ptr++ = num_entries & 0xff;
+    num_entries >>= 8;
+  }
+
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  uint64_t previous = 0;
+  uint64_t deltas[8];
+
+  unsigned i;
+  for (i = 0; i + 7 < entries_.size(); i += 8) {
+    for (unsigned j = 0; j < 8; ++j) {
+      deltas[j] = entries_[i + j] - previous;
+      previous = entries_[i + j];
+    }
+    pack_bits_block8(deltas, ptr, entry_bits);
+    ptr += entry_bits;
+  }
+
+  uint8_t offset = 0;
+  for (; i < entries_.size(); ++i) {
+    const uint64_t delta = entries_[i] - previous;
+    previous = entries_[i];
+    offset = pack_bits(delta, entry_bits, ptr, offset);
+  }
+  return bytes;
+}
+
 template<typename A>
 compact_theta_sketch_alloc<A> 
compact_theta_sketch_alloc<A>::deserialize(std::istream& is, uint64_t seed, 
const A& allocator) {
   const auto preamble_longs = read<uint8_t>(is);
   const auto serial_version = read<uint8_t>(is);
   const auto type = read<uint8_t>(is);
+  checker<true>::check_sketch_type(type, SKETCH_TYPE);
   switch (serial_version) {
-  case SERIAL_VERSION: {
-      read<uint16_t>(is); // unused
-      const auto flags_byte = read<uint8_t>(is);
-      const auto seed_hash = read<uint16_t>(is);
-      checker<true>::check_sketch_type(type, SKETCH_TYPE);
-      checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
-      const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
-      if (!is_empty) checker<true>::check_seed_hash(seed_hash, 
compute_seed_hash(seed));
-
-      uint64_t theta = theta_constants::MAX_THETA;
-      uint32_t num_entries = 0;
-      if (!is_empty) {
-        if (preamble_longs == 1) {
-          num_entries = 1;
-        } else {
-          num_entries = read<uint32_t>(is);
-          read<uint32_t>(is); // unused
-          if (preamble_longs > 2) {
-            theta = read<uint64_t>(is);
-          }
-        }
-      }
-      std::vector<uint64_t, A> entries(num_entries, 0, allocator);
-      if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * 
entries.size());
+    case 4:
+      return deserialize_v4(preamble_longs, is, seed, allocator);
+    case 3:
+      return deserialize_v3(preamble_longs, is, seed, allocator);
+    case 1:

Review Comment:
   Just curious why 1 comes before 2?



##########
theta/include/bit_packing.hpp:
##########
@@ -0,0 +1,6279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef BIT_PACKING_HPP_

Review Comment:
   I'm largely relying on the unit tests to ensure this code is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to