imply-cheddar commented on code in PR #12879:
URL: https://github.com/apache/druid/pull/12879#discussion_r953223188


##########
processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.io.IOException;
+
+public interface IOIterator<T>

Review Comment:
   Why the new interface?  It seems exactly equivalent to `Iterator` except for 
no `remove()` method? 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.serde.cell.CellWriter;
+import org.apache.druid.segment.serde.cell.DeserializingIOIterator;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.IntSerializer;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringBufferStore
+{
+  private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+      new SerializablePairLongStringSimpleStagedSerde();
+
+  private final WriteOutBytes writeOutBytes;
+  private final IntSerializer intSerializer = new IntSerializer();
+
+  private long minValue = Long.MAX_VALUE;
+  private long maxValue = Long.MIN_VALUE;
+
+  public SerializablePairLongStringBufferStore(WriteOutBytes writeOutBytes)
+  {
+    this.writeOutBytes = writeOutBytes;
+  }
+
+  public void store(@Nullable SerializablePairLongString pairLongString) 
throws IOException
+  {
+    if (pairLongString != null && pairLongString.lhs != null) {
+      minValue = Math.min(minValue, pairLongString.lhs);
+      maxValue = Math.max(maxValue, pairLongString.lhs);
+    }
+
+    byte[] bytes = SERDE.serialize(pairLongString);
+
+    writeOutBytes.write(intSerializer.serialize(bytes.length));
+    writeOutBytes.write(bytes);
+  }
+
+  /**
+   * each call transfers the temporary buffer into an encoded, block-compessed 
buffer of the segment. It is ready to be
+   * transferred to a {@link WritableByteChannel}
+   *
+   * @param byteBufferProvider    - provides a ByteBuffer used for block 
compressed encoding
+   * @param segmentWriteOutMedium - used to create temporary storage
+   * @return encoded buffer ready to be stored
+   * @throws IOException
+   */
+  public TransferredBuffer transferToRowWriter(
+      NativeClearedByteBufferProvider byteBufferProvider,
+      SegmentWriteOutMedium segmentWriteOutMedium
+  ) throws IOException
+  {
+    SerializablePairLongStringColumnHeader columnHeader = createColumnHeader();
+    SerializablePairLongStringDeltaEncodedStagedSerde serde =
+        new SerializablePairLongStringDeltaEncodedStagedSerde(
+            columnHeader.getMinValue(),
+            columnHeader.isUseIntegerDeltas()
+        );
+
+    // try-with-resources will call cellWriter.close() an extra time in the 
normal case, but it protects against
+    // buffer leaking in the case of an exception. In the normal path, close() 
performs some finalization of
+    // the CellWriter object. We want that object state finalized before 
creating the TransferredBuffer as a point of
+    // good style (though strictly speaking, it works fine to pass it in 
before calling close since TransferredBuffer
+    // does not do anything in the constructor with the object)
+    try (CellWriter cellWriter = new CellWriter.Builder(byteBufferProvider, 
segmentWriteOutMedium).build()) {
+      IOIterator<SerializablePairLongString> bufferIterator = new 
DeserializingIOIterator<>(

Review Comment:
   You have the `iterator()` method and then don't call it from here?  I think 
you could call it from here?



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Objects;
+import org.apache.druid.segment.serde.cell.LongSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringColumnHeader
+{
+  // header size is 4 bytes for word alignment for LZ4 (minmatch) compression
+  private static final int HEADER_SIZE_BYTES = 4;
+  private static final int USE_INTEGER_MASK = 0x80;
+  private static final int VERSION_INDEX = 0;
+  private static final int ENCODING_INDEX = 1;
+
+  private final LongSerializer longSerializer = new LongSerializer();
+  private final byte[] bytes;
+  private final long minValue;
+
+  private SerializablePairLongStringColumnHeader(byte[] bytes, long 
minTimestamp)
+  {
+    this.bytes = bytes;
+    this.minValue = minTimestamp;
+  }
+
+  public SerializablePairLongStringColumnHeader(byte version, boolean 
useIntegerDeltas, long minTimestamp)
+  {
+    this.minValue = minTimestamp;
+    bytes = new byte[HEADER_SIZE_BYTES];
+    bytes[VERSION_INDEX] = version;
+
+    if (useIntegerDeltas) {
+      bytes[ENCODING_INDEX] |= USE_INTEGER_MASK;
+    }
+  }
+
+  public static SerializablePairLongStringColumnHeader fromBuffer(ByteBuffer 
byteBuffer)
+  {
+    byte[] bytes = new byte[HEADER_SIZE_BYTES];
+
+    byteBuffer.get(bytes);
+
+    long minTimestamp = byteBuffer.getLong();
+
+    return new SerializablePairLongStringColumnHeader(bytes, minTimestamp);
+  }
+
+  public SerializablePairLongStringDeltaEncodedStagedSerde createSerde()
+  {
+    return new SerializablePairLongStringDeltaEncodedStagedSerde(minValue, 
isUseIntegerDeltas());
+  }
+
+  public void transferTo(WritableByteChannel channel) throws IOException
+  {
+    channel.write(ByteBuffer.wrap(bytes));
+    channel.write(longSerializer.serialize(minValue));

Review Comment:
   I believe that the `longSerializer` object is only used in this method and 
this method is only called once in the lifetime of this object.  As such, I 
don't think `longSerializer` needs to be instantiated as a field, but can just 
be created here in this method, used and thrown away.



##########
processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public interface StagedSerde<T>

Review Comment:
   I think this class could deserve some class-level javadoc to talk about what 
it's for.  I saw it used in one of the other classes, was like "hrm, what's a 
StagedSerde supposed to be?" and came here to learn about it.  Then was a bit 
sad that there was no class-level javadoc.  The method-level docs hint that 
there are some delayed serialization cases and there are some non-delayed, but 
not enough context to tell me about when/why I would want them.



##########
processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * <pre>
+ * serialized data is of the form:
+ *
+ *    <cell index>
+ *    <payload storage>
+ *
+ * each of these items is stored in compressed streams of blocks with a block 
index.
+ *
+ * A BlockCompressedPayloadWriter stores byte[] payloads. These may be 
accessed by creating a
+ * BlockCompressedPayloadReader over the produced ByteBuffer. Reads may be 
done by giving a location in the
+ * uncompressed stream and a size
+ *
+ * NOTE: {@link BlockCompressedPayloadBuffer} does not store nulls on write(). 
However, the cellIndex stores an entry
+ * with a size of 0 for nulls and {@link CellReader} will return null for any 
null written
+ *
+ * * blockIndexSize:int
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * |      block index
+ * |      compressed block # -> block start in compressed stream position 
(relative to data start)
+ * |
+ * |      0: [block position: int]
+ * |      1: [block position: int]
+ * |      ...
+ * |      i: [block position: int]
+ * |      ...
+ * |      n: [block position: int]
+ * |      n+1: [total compressed size ] // stored to simplify invariant of
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * dataSize:int
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * | <compressed payload block 1>
+ * | <compressed payload block 2>
+ * | ...
+ * | <compressed paylod block n>
+ * |
+ *
+ * the CellIndexWriter stores an array of longs using the 
BlockCompressedPayloadWriter
+ *
+ * logically this an array of longs
+ *
+ * |    0: start_0 : long
+ * |    1: start_1 : long
+ * |    ...
+ * |    n: start_n : long
+ * |    n+1: start_n + length_n : long  //ie, next position that would have 
been written to
+ *                                      //used again for invariant of length_i 
= row_i+1 - row_i
+ *
+ *      but this will be stored as block compressed. Reads are done by 
addressing it as a long array of bytes
+ *
+ * |    <block index size>
+ * |    <block index>
+ * |
+ * |    <data stream size>
+ * |    <block compressed payload stream>
+ *
+ * resulting in
+ *
+ * |    <cell index size>
+ * | ----cell index------------------------
+ * |    <block index size>
+ * |    <block index>
+ * |    <data stream size>
+ * |    <block compressed payload stream>
+ * | -------------------------------------
+ * |    <data stream size>
+ * | ----data stream------------------------
+ * |    <block index size>
+ * |    <block index>
+ * |    <data stream size>
+ * |    <block compressed payload stream>
+ * | -------------------------------------
+ * </pre>
+ */
+
+public class CellWriter implements Serializer, Closeable

Review Comment:
   There is great detail here about how this ends up serializing things, which 
is fabulous.  The class-level javadoc is, however, missing high level 
instructions about how it is expected to be used.  I.e. it has the details 
needed for someone who wants to understand the internals of it, but not the 
details needed for someone who just wants to use it as a black box.



##########
processing/src/main/java/org/apache/druid/segment/serde/cell/DeserializingIOIterator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.NoSuchElementException;
+
+public class DeserializingIOIterator<T> implements IOIterator<T>

Review Comment:
   If you do the `FirstPassSerializingThingie` as suggested elsewhere, this 
class can perhaps just become an anonymous inner class of that.  It can also 
still be its own class, I'm mostly commenting just to plant the seed that it 
can be adjusted if you adjust that.



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.serde.cell.CellWriter;
+import org.apache.druid.segment.serde.cell.DeserializingIOIterator;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.IntSerializer;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringBufferStore
+{
+  private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+      new SerializablePairLongStringSimpleStagedSerde();
+
+  private final WriteOutBytes writeOutBytes;
+  private final IntSerializer intSerializer = new IntSerializer();
+
+  private long minValue = Long.MAX_VALUE;
+  private long maxValue = Long.MIN_VALUE;
+
+  public SerializablePairLongStringBufferStore(WriteOutBytes writeOutBytes)
+  {
+    this.writeOutBytes = writeOutBytes;
+  }
+
+  public void store(@Nullable SerializablePairLongString pairLongString) 
throws IOException
+  {
+    if (pairLongString != null && pairLongString.lhs != null) {
+      minValue = Math.min(minValue, pairLongString.lhs);
+      maxValue = Math.max(maxValue, pairLongString.lhs);
+    }
+
+    byte[] bytes = SERDE.serialize(pairLongString);
+
+    writeOutBytes.write(intSerializer.serialize(bytes.length));
+    writeOutBytes.write(bytes);

Review Comment:
   There is a relatively common bit of work happening here in the "serialize to 
bytes and write to simply-encoded OutputStream so that we can then iterate over 
that later".  I think this could be meaningfully abstracted into a thing that 
is reusable.  The code here would change to 
`firstPassSerializingThingie.store(pairLongString);`
   
   And then in `transferToRowWriter` you could call 
`firstPassSerializingThingie.iterator()`.  With that done, you could make the 
constructor of this class depend on a 
`FirstPassSerializingThingie<SerializablePairLongString>`
   
   It doesn't really remove that many lines from this class, but it does make 
for a relatively simple API for reuse when doing 2-pass persists of a column?



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.serde.cell.CellWriter;
+import org.apache.druid.segment.serde.cell.DeserializingIOIterator;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.IntSerializer;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringBufferStore
+{
+  private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+      new SerializablePairLongStringSimpleStagedSerde();
+
+  private final WriteOutBytes writeOutBytes;
+  private final IntSerializer intSerializer = new IntSerializer();
+
+  private long minValue = Long.MAX_VALUE;
+  private long maxValue = Long.MIN_VALUE;
+
+  public SerializablePairLongStringBufferStore(WriteOutBytes writeOutBytes)
+  {
+    this.writeOutBytes = writeOutBytes;
+  }
+
+  public void store(@Nullable SerializablePairLongString pairLongString) 
throws IOException
+  {
+    if (pairLongString != null && pairLongString.lhs != null) {
+      minValue = Math.min(minValue, pairLongString.lhs);
+      maxValue = Math.max(maxValue, pairLongString.lhs);
+    }
+
+    byte[] bytes = SERDE.serialize(pairLongString);
+
+    writeOutBytes.write(intSerializer.serialize(bytes.length));
+    writeOutBytes.write(bytes);
+  }
+
+  /**
+   * each call transfers the temporary buffer into an encoded, block-compessed 
buffer of the segment. It is ready to be
+   * transferred to a {@link WritableByteChannel}
+   *
+   * @param byteBufferProvider    - provides a ByteBuffer used for block 
compressed encoding
+   * @param segmentWriteOutMedium - used to create temporary storage
+   * @return encoded buffer ready to be stored
+   * @throws IOException
+   */
+  public TransferredBuffer transferToRowWriter(
+      NativeClearedByteBufferProvider byteBufferProvider,
+      SegmentWriteOutMedium segmentWriteOutMedium
+  ) throws IOException
+  {
+    SerializablePairLongStringColumnHeader columnHeader = createColumnHeader();
+    SerializablePairLongStringDeltaEncodedStagedSerde serde =
+        new SerializablePairLongStringDeltaEncodedStagedSerde(
+            columnHeader.getMinValue(),
+            columnHeader.isUseIntegerDeltas()
+        );
+
+    // try-with-resources will call cellWriter.close() an extra time in the 
normal case, but it protects against
+    // buffer leaking in the case of an exception. In the normal path, close() 
performs some finalization of
+    // the CellWriter object. We want that object state finalized before 
creating the TransferredBuffer as a point of
+    // good style (though strictly speaking, it works fine to pass it in 
before calling close since TransferredBuffer
+    // does not do anything in the constructor with the object)
+    try (CellWriter cellWriter = new CellWriter.Builder(byteBufferProvider, 
segmentWriteOutMedium).build()) {
+      IOIterator<SerializablePairLongString> bufferIterator = new 
DeserializingIOIterator<>(
+          writeOutBytes.asInputStream(),

Review Comment:
   The `InputStream` returned by this is not guaranteed to be closed.  I 
haven't checked to see if that's a problem or not.



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.serde.cell.CellReader;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SerializablePairLongStringComplexColumn implements ComplexColumn
+{
+  private final Closer closer;
+  private final int serializedSize;
+  private final CellReader cellReader;
+  private final SerializablePairLongStringDeltaEncodedStagedSerde serde;
+
+  public SerializablePairLongStringComplexColumn(
+      CellReader cellReader,
+      SerializablePairLongStringDeltaEncodedStagedSerde serde,
+      Closer closer,
+      int serializedSize
+  )
+  {
+    this.cellReader = cellReader;
+    this.serde = serde;
+    this.closer = closer;
+    this.serializedSize = serializedSize;
+  }
+
+  @Override
+  public Class<?> getClazz()
+  {
+    return SerializablePairLongString.class;
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return SerializablePairLongStringComplexMetricSerde.TYPE_NAME;
+  }
+
+  @SuppressWarnings("ConstantConditions")
+  @Override
+  public Object getRowValue(int rowNum)
+  {
+    // nulls are handled properly by the aggregator

Review Comment:
   Maybe change this comment to 
   
   ```
   // This can return nulls, meaning that it is expected that anything reading 
from this does 
   // something "good" with null.  At time of writing, the aggregators handle 
null properly
   ```
   



##########
processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class CellReader

Review Comment:
   Given that `CellReader` is one of the entry points for intended reuse, I 
would like to see some class-level javadoc that tries to explain how the thing 
is expected to be used. 



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.serde.cell.StorableBuffer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * serializes a Long/String pair as
+ * Long:Integer:bytes
+ * <p>
+ * or
+ * Long:StringSize:StringData
+ */
+public class SerializablePairLongStringSimpleStagedSerde implements 
StagedSerde<SerializablePairLongString>
+{
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  @Override
+  public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString 
value)
+  {
+    if (value == null) {
+      return StorableBuffer.EMPTY;
+    }
+
+    String rhsString = value.rhs;
+    byte[] rhsBytes = stringToUtf8Bytes(rhsString);
+
+    return new StorableBuffer()
+    {
+      @Override
+      public void store(ByteBuffer byteBuffer)
+      {
+        Preconditions.checkNotNull(value.lhs, "Long in 
SerializablePairLongString must be non-null");
+
+        byteBuffer.putLong(value.lhs);
+        byteBuffer.putInt(rhsBytes.length);
+
+        if (rhsBytes.length > 0) {
+          byteBuffer.put(rhsBytes);
+        }
+      }
+
+      @Override
+      public int getSerializedSize()
+      {
+        return Long.BYTES + Integer.BYTES + rhsBytes.length;
+      }
+    };
+  }
+
+  @Nullable
+  @Override
+  public SerializablePairLongString deserialize(ByteBuffer byteBuffer)
+  {
+    if (byteBuffer.remaining() == 0) {
+      return null;
+    }
+
+    ByteBuffer readOnlyBuffer = 
byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+    long lhs = readOnlyBuffer.getLong();
+    int stringSize = readOnlyBuffer.getInt();
+    String lastString = null;
+
+    if (stringSize > 0) {
+      byte[] stringBytes = new byte[stringSize];
+
+      readOnlyBuffer.get(stringBytes, 0, stringSize);
+      lastString = StringUtils.fromUtf8(stringBytes);
+    }
+
+    return new SerializablePairLongString(lhs, lastString);
+  }
+
+  private static byte[] stringToUtf8Bytes(@Nullable String value)

Review Comment:
   `StringUtils.toUtf8WithNullAsEmpty`



##########
processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.serde.cell.StorableBuffer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * serializes a Long/String pair in the context of a column/segment. Uses the 
minValue to perform delta
+ * encoding/decoding and if the range of the segment fits in an integer 
(useIntegerDelta), the format is
+ * Integer:Integer:bytes
+ *
+ * otherwise
+ * Long:Integer:bytes
+ */
+public class SerializablePairLongStringDeltaEncodedStagedSerde implements 
StagedSerde<SerializablePairLongString>
+{
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  private final long minValue;
+  private final boolean useIntegerDelta;
+
+  public SerializablePairLongStringDeltaEncodedStagedSerde(long minValue, 
boolean useIntegerDelta)
+  {
+    this.minValue = minValue;
+    this.useIntegerDelta = useIntegerDelta;
+  }
+
+  @Override
+  public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString 
value)
+  {
+    if (value == null) {
+      return StorableBuffer.EMPTY;
+    }
+
+    String rhsString = value.rhs;
+    byte[] rhsBytes = stringToUtf8Bytes(rhsString);
+
+    return new StorableBuffer()
+    {
+      @Override
+      public void store(ByteBuffer byteBuffer)
+      {
+        Preconditions.checkNotNull(value.lhs, "Long in 
SerializablePairLongString must be non-null");
+
+        long delta = value.lhs - minValue;
+
+        Preconditions.checkState(delta >= 0 || delta == value.lhs);
+
+        if (useIntegerDelta) {
+          byteBuffer.putInt(Ints.checkedCast(delta));
+        } else {
+          byteBuffer.putLong(delta);
+        }
+
+        byteBuffer.putInt(rhsBytes.length);
+
+        if (rhsBytes.length > 0) {
+          byteBuffer.put(rhsBytes);
+        }
+      }
+
+      @Override
+      public int getSerializedSize()
+      {
+        return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Integer.BYTES 
+ rhsBytes.length;
+      }
+    };
+  }
+
+  @Nullable
+  @Override
+  public SerializablePairLongString deserialize(ByteBuffer byteBuffer)
+  {
+    if (byteBuffer.remaining() == 0) {
+      return null;
+    }
+
+    ByteBuffer readOnlyBuffer = 
byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+    long lhs;
+
+    if (useIntegerDelta) {
+      lhs = readOnlyBuffer.getInt();
+    } else {
+      lhs = readOnlyBuffer.getLong();
+    }
+
+    lhs += minValue;
+
+    int stringSize = readOnlyBuffer.getInt();
+    String lastString = null;
+
+    if (stringSize > 0) {
+      byte[] stringBytes = new byte[stringSize];
+
+      readOnlyBuffer.get(stringBytes, 0, stringSize);
+      lastString = StringUtils.fromUtf8(stringBytes);
+    }
+
+    return new SerializablePairLongString(lhs, lastString);
+  }
+
+  private static byte[] stringToUtf8Bytes(@Nullable String value)
+  {
+    return value == null ? EMPTY_BYTES : StringUtils.toUtf8Nullable(value);

Review Comment:
   There appears to be a `StringUtils.toUtf8WithNullToEmpty` that is exactly 
the semantics of this method, I think.



##########
processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterTest.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+
+public class CellWriterTest extends BytesReadWriteTestBase

Review Comment:
   This is a super meta comment, this PR is introducing classes `CellWriter` 
and `CellReader` that are intending to be reusable.  There is a `CellWriter` 
test, but no `CellReader` test and I haven't dug in yet to validate that there 
is some other test which uses both of them at the same time.  Sometimes, when 
people are trying to learn how to use a thing that is intended to be reusable, 
they look for some test classes that can show them the usage pattern.  At 
least, right now, I do not know which test classes to point someone at to 
understand how to use `CellWriter` and `CellReader`.
   
   Even for the `CellWriterTest`, it is currently leveraging the `TestBase` 
that really helps with validating the technical correctness, but the 
abstraction doesn't make it easy to see how to use the thing as a black box.  I 
made other comments asking for class-level javadoc, I'd like to also suggest 
some test somewhere whose goal is more to show how to use the thing and less 
actually validating technical correctness (of course, it should validate 
correctness too, just, the primary intent of the test should be an example of 
how to use the APIs).  The class-level javadoc can refer to that test to help 
solidify how teh thing should be used as well.  This suggestion is not saying 
that the current tests should be changed, it is suggesting the addition of a 
test somewhere that exemplifies the intended usage pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to