cryptoe commented on code in PR #16269:
URL: https://github.com/apache/druid/pull/16269#discussion_r1615538326


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/WorkerChatHandler.java:
##########
@@ -289,4 +309,10 @@ public Response httpGetCounters(@Context final 
HttpServletRequest req)
     ChatHandlers.authorizationCheck(req, Action.WRITE, task.getDataSource(), 
toolbox.getAuthorizerMapper());
     return 
Response.status(Response.Status.OK).entity(worker.getCounters()).build();
   }
+
+  public enum SketchEncoding
+  {
+    OCTET_STREAM,

Review Comment:
   Nit: lets add some documentation about sketching encoding here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte
+   * - 4 bytes: Number of buckets
+   * - 4 bytes: Number of entries in {@link 
ClusterByStatisticsSnapshot#getHasMultipleValues()}
+   * - 4 * number of multivalue bucket bytes: List of integers
+   * - number of buckets * buckets, serailized by {@link 
#serializeBucket(OutputStream, ClusterByStatisticsSnapshot.Bucket)}
+   */
+  public static void serialize(OutputStream outputStream, 
ClusterByStatisticsSnapshot snapshot) throws IOException
+  {
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> buckets = 
snapshot.getBuckets();
+    final Set<Integer> multipleValueBuckets = snapshot.getHasMultipleValues();
+
+    // Write a header byte, to be used to contain any metadata in the future.
+    outputStream.write(0x0);

Review Comment:
   Lets define this as a static variable. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte
+   * - 4 bytes: Number of buckets
+   * - 4 bytes: Number of entries in {@link 
ClusterByStatisticsSnapshot#getHasMultipleValues()}
+   * - 4 * number of multivalue bucket bytes: List of integers
+   * - number of buckets * buckets, serailized by {@link 
#serializeBucket(OutputStream, ClusterByStatisticsSnapshot.Bucket)}
+   */
+  public static void serialize(OutputStream outputStream, 
ClusterByStatisticsSnapshot snapshot) throws IOException
+  {
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> buckets = 
snapshot.getBuckets();
+    final Set<Integer> multipleValueBuckets = snapshot.getHasMultipleValues();
+
+    // Write a header byte, to be used to contain any metadata in the future.
+    outputStream.write(0x0);
+
+    writeIntToStream(outputStream, buckets.size());
+    ByteBuffer multivalueBuffer = ByteBuffer.allocate(Integer.BYTES + 
multipleValueBuckets.size())
+                                            
.putInt(multipleValueBuckets.size());
+    multipleValueBuckets.forEach(multivalueBuffer::putInt);
+    outputStream.write(multivalueBuffer.array());
+
+    for (Map.Entry<Long, ClusterByStatisticsSnapshot.Bucket> entry : 
buckets.entrySet()) {
+      writeLongToStream(outputStream, entry.getKey());
+      serializeBucket(outputStream, entry.getValue());
+    }
+  }
+
+  private static final int HEADER_OFFSET = 0;
+  private static final int BUCKET_COUNT_OFFSET = HEADER_OFFSET + Byte.BYTES;
+  private static final int MV_SET_SIZE_OFFSET = BUCKET_COUNT_OFFSET + 
Integer.BYTES;
+  private static final int MV_VALUES_OFFSET = MV_SET_SIZE_OFFSET + 
Integer.BYTES;
+
+  private static final int TIMECHUNK_OFFSET = 0;
+  private static final int BUCKET_SIZE_OFFSET = TIMECHUNK_OFFSET + Long.BYTES;
+  private static final int BUCKET_OFFSET = BUCKET_SIZE_OFFSET + Integer.BYTES;
+
+  public static ClusterByStatisticsSnapshot deserialize(ByteBuffer byteBuffer)
+  {
+    int position = byteBuffer.position();
+
+    final int bucketCount = byteBuffer.getInt(position + BUCKET_COUNT_OFFSET);
+    final int mvSetSize = byteBuffer.getInt(position + MV_SET_SIZE_OFFSET);
+
+    final Set<Integer> hasMultiValues = new HashSet<>();
+    for (int offset = position + MV_VALUES_OFFSET; offset < position + 
mvSetSize * Integer.BYTES; offset += Integer.BYTES) {
+      hasMultiValues.add(byteBuffer.getInt(offset));
+    }
+
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> buckets = new 
HashMap<>();
+
+    // Move the buffer position
+    int nextBucket = position + MV_VALUES_OFFSET + Integer.BYTES * mvSetSize;
+
+    for (int bucketNo = 0; bucketNo < bucketCount; bucketNo++) {
+      position = byteBuffer.position(nextBucket).position();
+
+      final long timeChunk = byteBuffer.getLong(position + TIMECHUNK_OFFSET);
+      final int snapshotSize = byteBuffer.getInt(position + 
BUCKET_SIZE_OFFSET);
+
+      final ByteBuffer duplicate = (ByteBuffer) byteBuffer.duplicate()
+                                                          
.order(byteBuffer.order())
+                                                          .position(position + 
BUCKET_OFFSET)
+                                                          .limit(position + 
BUCKET_OFFSET + snapshotSize);
+
+      ClusterByStatisticsSnapshot.Bucket bucket = deserializeBucket(duplicate);
+      buckets.put(timeChunk, bucket);
+
+      nextBucket = position + BUCKET_OFFSET + snapshotSize;
+    }
+
+    return new ClusterByStatisticsSnapshot(buckets, hasMultiValues);
+  }
+
+  /**
+   * Format:
+   * - 8 bytes: bytesRetained
+   * - 4 bytes: keyArray length
+   * - 4 bytes: snapshot length
+   * - keyArray length bytes: serialized key array
+   * - snapshot length bytes: serialized snapshot
+   */
+  static void serializeBucket(OutputStream outputStream, 
ClusterByStatisticsSnapshot.Bucket bucket) throws IOException
+  {
+    final byte[] bucketKeyArray = bucket.getBucketKey().array();
+    final double bytesRetained = bucket.getBytesRetained();
+
+    final KeyCollectorSnapshot snapshot = bucket.getKeyCollectorSnapshot();
+    final byte[] serializedSnapshot = 
snapshot.getSerializer().serialize(snapshot);
+
+    final int length = Double.BYTES + 2 * Integer.BYTES + 
bucketKeyArray.length + serializedSnapshot.length;

Review Comment:
   Lets add each value with a comment about what its is about. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/QuantilesSnapshotSerializer.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+import org.apache.druid.msq.statistics.QuantilesSketchKeyCollectorSnapshot;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Format:
+ * - 8 bytes: {@link QuantilesSketchKeyCollectorSnapshot#getAverageKeyLength()}
+ * - 4 bytes: length of quantile sketch snapshot
+ * - length bytes: the sketch snapshot

Review Comment:
   ```suggestion
    * - n bytes: the sketch snapshot
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/KeyCollectorSnapshotSerializer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializes a {@link ClusterByStatisticsSnapshot} into a byte[].
+ */
+public abstract class KeyCollectorSnapshotSerializer
+{
+  protected abstract byte getType();
+
+  protected abstract byte[] serializeKeyCollector(KeyCollectorSnapshot 
collectorSnapshot);
+
+  public byte[] serialize(KeyCollectorSnapshot collectorSnapshot)

Review Comment:
   Please add test cases for null and empty collectors. 
   for all the serde methods. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte

Review Comment:
   Lets mention this can be used for version. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte
+   * - 4 bytes: Number of buckets
+   * - 4 bytes: Number of entries in {@link 
ClusterByStatisticsSnapshot#getHasMultipleValues()}
+   * - 4 * number of multivalue bucket bytes: List of integers
+   * - number of buckets * buckets, serailized by {@link 
#serializeBucket(OutputStream, ClusterByStatisticsSnapshot.Bucket)}

Review Comment:
   I did not undestand why would you need numberOfBuckets*buckets. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/KeyCollectorSnapshotSerializer.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializes a {@link ClusterByStatisticsSnapshot} into a byte[].
+ */
+public abstract class KeyCollectorSnapshotSerializer
+{
+  protected abstract byte getType();
+
+  protected abstract byte[] serializeKeyCollector(KeyCollectorSnapshot 
collectorSnapshot);
+
+  public byte[] serialize(KeyCollectorSnapshot collectorSnapshot)

Review Comment:
   You should also mention here that type cannot by the same across various 
implementation. 
   Javadocs of the above serialized would be helpful. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.

Review Comment:
   We could also do keys in one part and values in another. That way we can 
always skip to the correct value faster.
   Since we are not persisting this payload yet, I think its fine but in the 
future if we have to we can always circle back on this. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte
+   * - 4 bytes: Number of buckets
+   * - 4 bytes: Number of entries in {@link 
ClusterByStatisticsSnapshot#getHasMultipleValues()}
+   * - 4 * number of multivalue bucket bytes: List of integers
+   * - number of buckets * buckets, serailized by {@link 
#serializeBucket(OutputStream, ClusterByStatisticsSnapshot.Bucket)}
+   */
+  public static void serialize(OutputStream outputStream, 
ClusterByStatisticsSnapshot snapshot) throws IOException
+  {
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> buckets = 
snapshot.getBuckets();
+    final Set<Integer> multipleValueBuckets = snapshot.getHasMultipleValues();
+
+    // Write a header byte, to be used to contain any metadata in the future.
+    outputStream.write(0x0);
+
+    writeIntToStream(outputStream, buckets.size());
+    ByteBuffer multivalueBuffer = ByteBuffer.allocate(Integer.BYTES + 
multipleValueBuckets.size())
+                                            
.putInt(multipleValueBuckets.size());
+    multipleValueBuckets.forEach(multivalueBuffer::putInt);
+    outputStream.write(multivalueBuffer.array());
+
+    for (Map.Entry<Long, ClusterByStatisticsSnapshot.Bucket> entry : 
buckets.entrySet()) {
+      writeLongToStream(outputStream, entry.getKey());
+      serializeBucket(outputStream, entry.getValue());
+    }
+  }
+
+  private static final int HEADER_OFFSET = 0;
+  private static final int BUCKET_COUNT_OFFSET = HEADER_OFFSET + Byte.BYTES;
+  private static final int MV_SET_SIZE_OFFSET = BUCKET_COUNT_OFFSET + 
Integer.BYTES;
+  private static final int MV_VALUES_OFFSET = MV_SET_SIZE_OFFSET + 
Integer.BYTES;
+
+  private static final int TIMECHUNK_OFFSET = 0;
+  private static final int BUCKET_SIZE_OFFSET = TIMECHUNK_OFFSET + Long.BYTES;
+  private static final int BUCKET_OFFSET = BUCKET_SIZE_OFFSET + Integer.BYTES;
+
+  public static ClusterByStatisticsSnapshot deserialize(ByteBuffer byteBuffer)
+  {
+    int position = byteBuffer.position();
+
+    final int bucketCount = byteBuffer.getInt(position + BUCKET_COUNT_OFFSET);
+    final int mvSetSize = byteBuffer.getInt(position + MV_SET_SIZE_OFFSET);
+
+    final Set<Integer> hasMultiValues = new HashSet<>();
+    for (int offset = position + MV_VALUES_OFFSET; offset < position + 
mvSetSize * Integer.BYTES; offset += Integer.BYTES) {
+      hasMultiValues.add(byteBuffer.getInt(offset));
+    }
+
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> buckets = new 
HashMap<>();
+
+    // Move the buffer position
+    int nextBucket = position + MV_VALUES_OFFSET + Integer.BYTES * mvSetSize;
+
+    for (int bucketNo = 0; bucketNo < bucketCount; bucketNo++) {
+      position = byteBuffer.position(nextBucket).position();
+
+      final long timeChunk = byteBuffer.getLong(position + TIMECHUNK_OFFSET);
+      final int snapshotSize = byteBuffer.getInt(position + 
BUCKET_SIZE_OFFSET);
+
+      final ByteBuffer duplicate = (ByteBuffer) byteBuffer.duplicate()
+                                                          
.order(byteBuffer.order())
+                                                          .position(position + 
BUCKET_OFFSET)
+                                                          .limit(position + 
BUCKET_OFFSET + snapshotSize);
+
+      ClusterByStatisticsSnapshot.Bucket bucket = deserializeBucket(duplicate);
+      buckets.put(timeChunk, bucket);
+
+      nextBucket = position + BUCKET_OFFSET + snapshotSize;
+    }
+
+    return new ClusterByStatisticsSnapshot(buckets, hasMultiValues);
+  }
+
+  /**
+   * Format:
+   * - 8 bytes: bytesRetained
+   * - 4 bytes: keyArray length
+   * - 4 bytes: snapshot length
+   * - keyArray length bytes: serialized key array
+   * - snapshot length bytes: serialized snapshot
+   */
+  static void serializeBucket(OutputStream outputStream, 
ClusterByStatisticsSnapshot.Bucket bucket) throws IOException
+  {
+    final byte[] bucketKeyArray = bucket.getBucketKey().array();
+    final double bytesRetained = bucket.getBytesRetained();
+
+    final KeyCollectorSnapshot snapshot = bucket.getKeyCollectorSnapshot();
+    final byte[] serializedSnapshot = 
snapshot.getSerializer().serialize(snapshot);
+
+    final int length = Double.BYTES + 2 * Integer.BYTES + 
bucketKeyArray.length + serializedSnapshot.length;
+
+    outputStream.write(
+        ByteBuffer.allocate(Integer.BYTES + length)

Review Comment:
   Please mention that 4 bytes are for the length. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte
+   * - 4 bytes: Number of buckets
+   * - 4 bytes: Number of entries in {@link 
ClusterByStatisticsSnapshot#getHasMultipleValues()}
+   * - 4 * number of multivalue bucket bytes: List of integers

Review Comment:
   ```suggestion
      * - 4 * each multivalue column: List of integers
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/serde/ClusterByStatisticsSnapshotSerde.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.statistics.serde;
+
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the serialization and deserialization of {@link 
ClusterByStatisticsSnapshot}, into a byte array.
+ */
+public class ClusterByStatisticsSnapshotSerde
+{
+  /**
+   * Deserializes the {@link ClusterByStatisticsSnapshot} and writes it to the 
{@link OutputStream}.
+   * <br>
+   * Format:
+   * - 1 byte : Header byte
+   * - 4 bytes: Number of buckets
+   * - 4 bytes: Number of entries in {@link 
ClusterByStatisticsSnapshot#getHasMultipleValues()}
+   * - 4 * number of multivalue bucket bytes: List of integers
+   * - number of buckets * buckets, serailized by {@link 
#serializeBucket(OutputStream, ClusterByStatisticsSnapshot.Bucket)}
+   */
+  public static void serialize(OutputStream outputStream, 
ClusterByStatisticsSnapshot snapshot) throws IOException
+  {
+    final Map<Long, ClusterByStatisticsSnapshot.Bucket> buckets = 
snapshot.getBuckets();
+    final Set<Integer> multipleValueBuckets = snapshot.getHasMultipleValues();
+
+    // Write a header byte, to be used to contain any metadata in the future.
+    outputStream.write(0x0);
+
+    writeIntToStream(outputStream, buckets.size());
+    ByteBuffer multivalueBuffer = ByteBuffer.allocate(Integer.BYTES + 
multipleValueBuckets.size())
+                                            
.putInt(multipleValueBuckets.size());
+    multipleValueBuckets.forEach(multivalueBuffer::putInt);
+    outputStream.write(multivalueBuffer.array());
+
+    for (Map.Entry<Long, ClusterByStatisticsSnapshot.Bucket> entry : 
buckets.entrySet()) {
+      writeLongToStream(outputStream, entry.getKey());
+      serializeBucket(outputStream, entry.getValue());
+    }
+  }
+
+  private static final int HEADER_OFFSET = 0;
+  private static final int BUCKET_COUNT_OFFSET = HEADER_OFFSET + Byte.BYTES;
+  private static final int MV_SET_SIZE_OFFSET = BUCKET_COUNT_OFFSET + 
Integer.BYTES;
+  private static final int MV_VALUES_OFFSET = MV_SET_SIZE_OFFSET + 
Integer.BYTES;
+
+  private static final int TIMECHUNK_OFFSET = 0;
+  private static final int BUCKET_SIZE_OFFSET = TIMECHUNK_OFFSET + Long.BYTES;
+  private static final int BUCKET_OFFSET = BUCKET_SIZE_OFFSET + Integer.BYTES;
+
+  public static ClusterByStatisticsSnapshot deserialize(ByteBuffer byteBuffer)
+  {
+    int position = byteBuffer.position();
+
+    final int bucketCount = byteBuffer.getInt(position + BUCKET_COUNT_OFFSET);
+    final int mvSetSize = byteBuffer.getInt(position + MV_SET_SIZE_OFFSET);
+
+    final Set<Integer> hasMultiValues = new HashSet<>();
+    for (int offset = position + MV_VALUES_OFFSET; offset < position + 
mvSetSize * Integer.BYTES; offset += Integer.BYTES) {

Review Comment:
   Shoudn't this have `MV_VALUES_OFFSET` in the end condition? 
   
   ```
   offset < position +MV_VALUES_OFFSET + mvSetSize * Integer.BYTES
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to