AlexanderSaydakov commented on code in PR #332:
URL: https://github.com/apache/datasketches-cpp/pull/332#discussion_r1089277545


##########
theta/include/theta_sketch_impl.hpp:
##########
@@ -377,156 +372,355 @@ auto compact_theta_sketch_alloc<A>::serialize(unsigned 
header_size_bytes) const
   vector_bytes bytes(size, 0, entries_.get_allocator());
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
-  ptr += copy_to_mem(preamble_longs, ptr);
-  const uint8_t serial_version = SERIAL_VERSION;
-  ptr += copy_to_mem(serial_version, ptr);
-  const uint8_t type = SKETCH_TYPE;
-  ptr += copy_to_mem(type, ptr);
+  *ptr++ = preamble_longs;
+  *ptr++ = UNCOMPRESSED_SERIAL_VERSION;
+  *ptr++ = SKETCH_TYPE;
   ptr += sizeof(uint16_t); // unused
   const uint8_t flags_byte(
     (1 << flags::IS_COMPACT) |
     (1 << flags::IS_READ_ONLY) |
     (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
     (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
   );
-  ptr += copy_to_mem(flags_byte, ptr);
-  const uint16_t seed_hash = get_seed_hash();
-  ptr += copy_to_mem(seed_hash, ptr);
+  *ptr++ = flags_byte;
+  ptr += copy_to_mem(get_seed_hash(), ptr);
   if (preamble_longs > 1) {
-    const uint32_t num_entries = static_cast<uint32_t>(entries_.size());
-    ptr += copy_to_mem(num_entries, ptr);
+    ptr += copy_to_mem<uint32_t>(entries_.size(), ptr);
     ptr += sizeof(uint32_t); // unused
   }
   if (this->is_estimation_mode()) ptr += copy_to_mem(theta_, ptr);
   if (entries_.size() > 0) ptr += copy_to_mem(entries_.data(), ptr, 
entries_.size() * sizeof(uint64_t));
   return bytes;
 }
 
+template<typename A>
+bool compact_theta_sketch_alloc<A>::is_suitable_for_compression() const {
+  if (!this->is_ordered() || entries_.size() == 0 ||
+        (entries_.size() == 1 && !this->is_estimation_mode())) return false;
+  return true;
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_compressed(std::ostream& os) 
const {
+  if (is_suitable_for_compression()) return serialize_version_4(os);
+  return serialize(os);
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned 
header_size_bytes) const -> vector_bytes {
+  if (is_suitable_for_compression()) return 
serialize_version_4(header_size_bytes);
+  return serialize(header_size_bytes);
+}
+
+template<typename A>
+uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
+  // compression is based on leading zeros in deltas between ordered hash 
values
+  // assumes ordered sketch
+  uint64_t previous = 0;
+  uint64_t ored = 0;
+  for (const uint64_t entry: entries_) {
+    const uint64_t delta = entry - previous;
+    ored |= delta;
+    previous = entry;
+  }
+  return count_leading_zeros_in_u64(ored);
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) 
const {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  const uint8_t min_entry_zeros = compute_min_leading_zeros();
+
+  // store num_entries as whole bytes since whole-byte blocks will follow 
(most probably)
+  const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - 
count_leading_zeros_in_u32(entries_.size()));
+
+  write(os, preamble_longs);
+  write(os, COMPRESSED_SERIAL_VERSION);
+  write(os, SKETCH_TYPE);
+  write(os, min_entry_zeros);
+  write(os, num_entries_bytes);
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (1 << flags::IS_ORDERED)
+  );
+  write(os, flags_byte);
+  write(os, get_seed_hash());
+  if (this->is_estimation_mode()) write(os, this->theta_);
+  uint32_t num_entries = entries_.size();
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    write<uint8_t>(os, num_entries & 0xff);
+    num_entries >>= 8;
+  }
+
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  uint64_t previous = 0;
+  uint64_t deltas[8];
+  vector_bytes buffer(entry_bits, 0, entries_.get_allocator()); // block of 8 
entries takes entry_bits bytes
+
+  unsigned i;
+  for (i = 0; i + 7 < entries_.size(); i += 8) {
+    for (unsigned j = 0; j < 8; ++j) {
+      deltas[j] = entries_[i + j] - previous;
+      previous = entries_[i + j];
+    }
+    pack_bits_block8(deltas, buffer.data(), entry_bits);
+    write(os, buffer.data(), buffer.size());
+  }
+
+  uint8_t offset = 0;
+  uint8_t* ptr = buffer.data();
+  for (; i < entries_.size(); ++i) {
+    const uint64_t delta = entries_[i] - previous;
+    previous = entries_[i];
+    offset = pack_bits(delta, entry_bits, ptr, offset);
+  }
+  write(os, buffer.data(), ptr - buffer.data());
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_version_4(unsigned 
header_size_bytes) const -> vector_bytes {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  const uint8_t min_entry_zeros = compute_min_leading_zeros();
+  const size_t compressed_bits = (64 - min_entry_zeros) * entries_.size();
+
+  // store num_entries as whole bytes since whole-byte blocks will follow 
(most probably)
+  const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - 
count_leading_zeros_in_u32(entries_.size()));
+
+  const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + 
num_entries_bytes
+      + whole_bytes_to_hold_bits(compressed_bits);
+  vector_bytes bytes(size, 0, entries_.get_allocator());
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+
+  *ptr++ = preamble_longs;
+  *ptr++ = COMPRESSED_SERIAL_VERSION;
+  *ptr++ = SKETCH_TYPE;
+  *ptr++ = min_entry_zeros;
+  *ptr++ = num_entries_bytes;
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (1 << flags::IS_ORDERED)
+  );
+  *ptr++ = flags_byte;
+  ptr += copy_to_mem(get_seed_hash(), ptr);
+  if (this->is_estimation_mode()) {
+    ptr += copy_to_mem(theta_, ptr);
+  }
+  uint32_t num_entries = entries_.size();
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    *ptr++ = num_entries & 0xff;
+    num_entries >>= 8;
+  }
+
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  uint64_t previous = 0;
+  uint64_t deltas[8];
+
+  unsigned i;
+  for (i = 0; i + 7 < entries_.size(); i += 8) {
+    for (unsigned j = 0; j < 8; ++j) {
+      deltas[j] = entries_[i + j] - previous;
+      previous = entries_[i + j];
+    }
+    pack_bits_block8(deltas, ptr, entry_bits);
+    ptr += entry_bits;
+  }
+
+  uint8_t offset = 0;
+  for (; i < entries_.size(); ++i) {
+    const uint64_t delta = entries_[i] - previous;
+    previous = entries_[i];
+    offset = pack_bits(delta, entry_bits, ptr, offset);
+  }
+  return bytes;
+}
+
 template<typename A>
 compact_theta_sketch_alloc<A> 
compact_theta_sketch_alloc<A>::deserialize(std::istream& is, uint64_t seed, 
const A& allocator) {
   const auto preamble_longs = read<uint8_t>(is);
   const auto serial_version = read<uint8_t>(is);
   const auto type = read<uint8_t>(is);
+  checker<true>::check_sketch_type(type, SKETCH_TYPE);
   switch (serial_version) {
-  case SERIAL_VERSION: {
-      read<uint16_t>(is); // unused
-      const auto flags_byte = read<uint8_t>(is);
-      const auto seed_hash = read<uint16_t>(is);
-      checker<true>::check_sketch_type(type, SKETCH_TYPE);
-      checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
-      const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
-      if (!is_empty) checker<true>::check_seed_hash(seed_hash, 
compute_seed_hash(seed));
-
-      uint64_t theta = theta_constants::MAX_THETA;
-      uint32_t num_entries = 0;
-      if (!is_empty) {
-        if (preamble_longs == 1) {
-          num_entries = 1;
-        } else {
-          num_entries = read<uint32_t>(is);
-          read<uint32_t>(is); // unused
-          if (preamble_longs > 2) {
-            theta = read<uint64_t>(is);
-          }
-        }
-      }
-      std::vector<uint64_t, A> entries(num_entries, 0, allocator);
-      if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * 
entries.size());
+    case 4:
+      return deserialize_v4(preamble_longs, is, seed, allocator);
+    case 3:
+      return deserialize_v3(preamble_longs, is, seed, allocator);
+    case 1:

Review Comment:
   I preserved the order that was there before. maybe historical accident, 
maybe the thinking was to handle the most probable first (which may not really 
matter with the switch statement)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to