imply-cheddar commented on code in PR #12879: URL: https://github.com/apache/druid/pull/12879#discussion_r953223188
########## processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.serde.cell; + +import java.io.IOException; + +public interface IOIterator<T> Review Comment: Why the new interface? It seems exactly equivalent to `Iterator` except for no `remove()` method? ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.serde.cell.CellWriter; +import org.apache.druid.segment.serde.cell.DeserializingIOIterator; +import org.apache.druid.segment.serde.cell.IOIterator; +import org.apache.druid.segment.serde.cell.IntSerializer; +import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.WriteOutBytes; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +public class SerializablePairLongStringBufferStore +{ + private static final SerializablePairLongStringSimpleStagedSerde SERDE = + new SerializablePairLongStringSimpleStagedSerde(); + + private final WriteOutBytes writeOutBytes; + private final IntSerializer intSerializer = new IntSerializer(); + + private long minValue = Long.MAX_VALUE; + private long maxValue = Long.MIN_VALUE; + + public SerializablePairLongStringBufferStore(WriteOutBytes writeOutBytes) + { + this.writeOutBytes = writeOutBytes; + } + + public void store(@Nullable SerializablePairLongString pairLongString) throws IOException + { + if (pairLongString != null && pairLongString.lhs != null) { + minValue = Math.min(minValue, pairLongString.lhs); + maxValue = Math.max(maxValue, pairLongString.lhs); + } + + byte[] bytes = SERDE.serialize(pairLongString); + + writeOutBytes.write(intSerializer.serialize(bytes.length)); + writeOutBytes.write(bytes); + } + + /** + * each call transfers the temporary buffer into an encoded, block-compessed buffer of the segment. It is ready to be + * transferred to a {@link WritableByteChannel} + * + * @param byteBufferProvider - provides a ByteBuffer used for block compressed encoding + * @param segmentWriteOutMedium - used to create temporary storage + * @return encoded buffer ready to be stored + * @throws IOException + */ + public TransferredBuffer transferToRowWriter( + NativeClearedByteBufferProvider byteBufferProvider, + SegmentWriteOutMedium segmentWriteOutMedium + ) throws IOException + { + SerializablePairLongStringColumnHeader columnHeader = createColumnHeader(); + SerializablePairLongStringDeltaEncodedStagedSerde serde = + new SerializablePairLongStringDeltaEncodedStagedSerde( + columnHeader.getMinValue(), + columnHeader.isUseIntegerDeltas() + ); + + // try-with-resources will call cellWriter.close() an extra time in the normal case, but it protects against + // buffer leaking in the case of an exception. In the normal path, close() performs some finalization of + // the CellWriter object. We want that object state finalized before creating the TransferredBuffer as a point of + // good style (though strictly speaking, it works fine to pass it in before calling close since TransferredBuffer + // does not do anything in the constructor with the object) + try (CellWriter cellWriter = new CellWriter.Builder(byteBufferProvider, segmentWriteOutMedium).build()) { + IOIterator<SerializablePairLongString> bufferIterator = new DeserializingIOIterator<>( Review Comment: You have the `iterator()` method and then don't call it from here? I think you could call it from here? ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.base.Objects; +import org.apache.druid.segment.serde.cell.LongSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +public class SerializablePairLongStringColumnHeader +{ + // header size is 4 bytes for word alignment for LZ4 (minmatch) compression + private static final int HEADER_SIZE_BYTES = 4; + private static final int USE_INTEGER_MASK = 0x80; + private static final int VERSION_INDEX = 0; + private static final int ENCODING_INDEX = 1; + + private final LongSerializer longSerializer = new LongSerializer(); + private final byte[] bytes; + private final long minValue; + + private SerializablePairLongStringColumnHeader(byte[] bytes, long minTimestamp) + { + this.bytes = bytes; + this.minValue = minTimestamp; + } + + public SerializablePairLongStringColumnHeader(byte version, boolean useIntegerDeltas, long minTimestamp) + { + this.minValue = minTimestamp; + bytes = new byte[HEADER_SIZE_BYTES]; + bytes[VERSION_INDEX] = version; + + if (useIntegerDeltas) { + bytes[ENCODING_INDEX] |= USE_INTEGER_MASK; + } + } + + public static SerializablePairLongStringColumnHeader fromBuffer(ByteBuffer byteBuffer) + { + byte[] bytes = new byte[HEADER_SIZE_BYTES]; + + byteBuffer.get(bytes); + + long minTimestamp = byteBuffer.getLong(); + + return new SerializablePairLongStringColumnHeader(bytes, minTimestamp); + } + + public SerializablePairLongStringDeltaEncodedStagedSerde createSerde() + { + return new SerializablePairLongStringDeltaEncodedStagedSerde(minValue, isUseIntegerDeltas()); + } + + public void transferTo(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(bytes)); + channel.write(longSerializer.serialize(minValue)); Review Comment: I believe that the `longSerializer` object is only used in this method and this method is only called once in the lifetime of this object. As such, I don't think `longSerializer` needs to be instantiated as a field, but can just be created here in this method, used and thrown away. ########## processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.serde.cell; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public interface StagedSerde<T> Review Comment: I think this class could deserve some class-level javadoc to talk about what it's for. I saw it used in one of the other classes, was like "hrm, what's a StagedSerde supposed to be?" and came here to learn about it. Then was a bit sad that there was no class-level javadoc. The method-level docs hint that there are some delayed serialization cases and there are some non-delayed, but not enough context to tell me about when/why I would want them. ########## processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.serde.cell; + +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +/** + * <pre> + * serialized data is of the form: + * + * <cell index> + * <payload storage> + * + * each of these items is stored in compressed streams of blocks with a block index. + * + * A BlockCompressedPayloadWriter stores byte[] payloads. These may be accessed by creating a + * BlockCompressedPayloadReader over the produced ByteBuffer. Reads may be done by giving a location in the + * uncompressed stream and a size + * + * NOTE: {@link BlockCompressedPayloadBuffer} does not store nulls on write(). However, the cellIndex stores an entry + * with a size of 0 for nulls and {@link CellReader} will return null for any null written + * + * * blockIndexSize:int + * ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + * | block index + * | compressed block # -> block start in compressed stream position (relative to data start) + * | + * | 0: [block position: int] + * | 1: [block position: int] + * | ... + * | i: [block position: int] + * | ... + * | n: [block position: int] + * | n+1: [total compressed size ] // stored to simplify invariant of + * ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + * dataSize:int + * ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + * | <compressed payload block 1> + * | <compressed payload block 2> + * | ... + * | <compressed paylod block n> + * | + * + * the CellIndexWriter stores an array of longs using the BlockCompressedPayloadWriter + * + * logically this an array of longs + * + * | 0: start_0 : long + * | 1: start_1 : long + * | ... + * | n: start_n : long + * | n+1: start_n + length_n : long //ie, next position that would have been written to + * //used again for invariant of length_i = row_i+1 - row_i + * + * but this will be stored as block compressed. Reads are done by addressing it as a long array of bytes + * + * | <block index size> + * | <block index> + * | + * | <data stream size> + * | <block compressed payload stream> + * + * resulting in + * + * | <cell index size> + * | ----cell index------------------------ + * | <block index size> + * | <block index> + * | <data stream size> + * | <block compressed payload stream> + * | ------------------------------------- + * | <data stream size> + * | ----data stream------------------------ + * | <block index size> + * | <block index> + * | <data stream size> + * | <block compressed payload stream> + * | ------------------------------------- + * </pre> + */ + +public class CellWriter implements Serializer, Closeable Review Comment: There is great detail here about how this ends up serializing things, which is fabulous. The class-level javadoc is, however, missing high level instructions about how it is expected to be used. I.e. it has the details needed for someone who wants to understand the internals of it, but not the details needed for someone who just wants to use it as a black box. ########## processing/src/main/java/org/apache/druid/segment/serde/cell/DeserializingIOIterator.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.serde.cell; + +import com.google.common.base.Preconditions; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.NoSuchElementException; + +public class DeserializingIOIterator<T> implements IOIterator<T> Review Comment: If you do the `FirstPassSerializingThingie` as suggested elsewhere, this class can perhaps just become an anonymous inner class of that. It can also still be its own class, I'm mostly commenting just to plant the seed that it can be adjusted if you adjust that. ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.serde.cell.CellWriter; +import org.apache.druid.segment.serde.cell.DeserializingIOIterator; +import org.apache.druid.segment.serde.cell.IOIterator; +import org.apache.druid.segment.serde.cell.IntSerializer; +import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.WriteOutBytes; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +public class SerializablePairLongStringBufferStore +{ + private static final SerializablePairLongStringSimpleStagedSerde SERDE = + new SerializablePairLongStringSimpleStagedSerde(); + + private final WriteOutBytes writeOutBytes; + private final IntSerializer intSerializer = new IntSerializer(); + + private long minValue = Long.MAX_VALUE; + private long maxValue = Long.MIN_VALUE; + + public SerializablePairLongStringBufferStore(WriteOutBytes writeOutBytes) + { + this.writeOutBytes = writeOutBytes; + } + + public void store(@Nullable SerializablePairLongString pairLongString) throws IOException + { + if (pairLongString != null && pairLongString.lhs != null) { + minValue = Math.min(minValue, pairLongString.lhs); + maxValue = Math.max(maxValue, pairLongString.lhs); + } + + byte[] bytes = SERDE.serialize(pairLongString); + + writeOutBytes.write(intSerializer.serialize(bytes.length)); + writeOutBytes.write(bytes); Review Comment: There is a relatively common bit of work happening here in the "serialize to bytes and write to simply-encoded OutputStream so that we can then iterate over that later". I think this could be meaningfully abstracted into a thing that is reusable. The code here would change to `firstPassSerializingThingie.store(pairLongString);` And then in `transferToRowWriter` you could call `firstPassSerializingThingie.iterator()`. With that done, you could make the constructor of this class depend on a `FirstPassSerializingThingie<SerializablePairLongString>` It doesn't really remove that many lines from this class, but it does make for a relatively simple API for reuse when doing 2-pass persists of a column? ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.serde.cell.CellWriter; +import org.apache.druid.segment.serde.cell.DeserializingIOIterator; +import org.apache.druid.segment.serde.cell.IOIterator; +import org.apache.druid.segment.serde.cell.IntSerializer; +import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.WriteOutBytes; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +public class SerializablePairLongStringBufferStore +{ + private static final SerializablePairLongStringSimpleStagedSerde SERDE = + new SerializablePairLongStringSimpleStagedSerde(); + + private final WriteOutBytes writeOutBytes; + private final IntSerializer intSerializer = new IntSerializer(); + + private long minValue = Long.MAX_VALUE; + private long maxValue = Long.MIN_VALUE; + + public SerializablePairLongStringBufferStore(WriteOutBytes writeOutBytes) + { + this.writeOutBytes = writeOutBytes; + } + + public void store(@Nullable SerializablePairLongString pairLongString) throws IOException + { + if (pairLongString != null && pairLongString.lhs != null) { + minValue = Math.min(minValue, pairLongString.lhs); + maxValue = Math.max(maxValue, pairLongString.lhs); + } + + byte[] bytes = SERDE.serialize(pairLongString); + + writeOutBytes.write(intSerializer.serialize(bytes.length)); + writeOutBytes.write(bytes); + } + + /** + * each call transfers the temporary buffer into an encoded, block-compessed buffer of the segment. It is ready to be + * transferred to a {@link WritableByteChannel} + * + * @param byteBufferProvider - provides a ByteBuffer used for block compressed encoding + * @param segmentWriteOutMedium - used to create temporary storage + * @return encoded buffer ready to be stored + * @throws IOException + */ + public TransferredBuffer transferToRowWriter( + NativeClearedByteBufferProvider byteBufferProvider, + SegmentWriteOutMedium segmentWriteOutMedium + ) throws IOException + { + SerializablePairLongStringColumnHeader columnHeader = createColumnHeader(); + SerializablePairLongStringDeltaEncodedStagedSerde serde = + new SerializablePairLongStringDeltaEncodedStagedSerde( + columnHeader.getMinValue(), + columnHeader.isUseIntegerDeltas() + ); + + // try-with-resources will call cellWriter.close() an extra time in the normal case, but it protects against + // buffer leaking in the case of an exception. In the normal path, close() performs some finalization of + // the CellWriter object. We want that object state finalized before creating the TransferredBuffer as a point of + // good style (though strictly speaking, it works fine to pass it in before calling close since TransferredBuffer + // does not do anything in the constructor with the object) + try (CellWriter cellWriter = new CellWriter.Builder(byteBufferProvider, segmentWriteOutMedium).build()) { + IOIterator<SerializablePairLongString> bufferIterator = new DeserializingIOIterator<>( + writeOutBytes.asInputStream(), Review Comment: The `InputStream` returned by this is not guaranteed to be closed. I haven't checked to see if that's a problem or not. ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.base.Preconditions; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.segment.column.ComplexColumn; +import org.apache.druid.segment.serde.cell.CellReader; +import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class SerializablePairLongStringComplexColumn implements ComplexColumn +{ + private final Closer closer; + private final int serializedSize; + private final CellReader cellReader; + private final SerializablePairLongStringDeltaEncodedStagedSerde serde; + + public SerializablePairLongStringComplexColumn( + CellReader cellReader, + SerializablePairLongStringDeltaEncodedStagedSerde serde, + Closer closer, + int serializedSize + ) + { + this.cellReader = cellReader; + this.serde = serde; + this.closer = closer; + this.serializedSize = serializedSize; + } + + @Override + public Class<?> getClazz() + { + return SerializablePairLongString.class; + } + + @Override + public String getTypeName() + { + return SerializablePairLongStringComplexMetricSerde.TYPE_NAME; + } + + @SuppressWarnings("ConstantConditions") + @Override + public Object getRowValue(int rowNum) + { + // nulls are handled properly by the aggregator Review Comment: Maybe change this comment to ``` // This can return nulls, meaning that it is expected that anything reading from this does // something "good" with null. At time of writing, the aggregators handle null properly ``` ########## processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.serde.cell; + +import org.apache.druid.segment.data.CompressionStrategy; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class CellReader Review Comment: Given that `CellReader` is one of the entry points for intended reuse, I would like to see some class-level javadoc that tries to explain how the thing is expected to be used. ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.serde.cell.StagedSerde; +import org.apache.druid.segment.serde.cell.StorableBuffer; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * serializes a Long/String pair as + * Long:Integer:bytes + * <p> + * or + * Long:StringSize:StringData + */ +public class SerializablePairLongStringSimpleStagedSerde implements StagedSerde<SerializablePairLongString> +{ + private static final byte[] EMPTY_BYTES = new byte[0]; + + @Override + public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value) + { + if (value == null) { + return StorableBuffer.EMPTY; + } + + String rhsString = value.rhs; + byte[] rhsBytes = stringToUtf8Bytes(rhsString); + + return new StorableBuffer() + { + @Override + public void store(ByteBuffer byteBuffer) + { + Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null"); + + byteBuffer.putLong(value.lhs); + byteBuffer.putInt(rhsBytes.length); + + if (rhsBytes.length > 0) { + byteBuffer.put(rhsBytes); + } + } + + @Override + public int getSerializedSize() + { + return Long.BYTES + Integer.BYTES + rhsBytes.length; + } + }; + } + + @Nullable + @Override + public SerializablePairLongString deserialize(ByteBuffer byteBuffer) + { + if (byteBuffer.remaining() == 0) { + return null; + } + + ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder()); + long lhs = readOnlyBuffer.getLong(); + int stringSize = readOnlyBuffer.getInt(); + String lastString = null; + + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + + private static byte[] stringToUtf8Bytes(@Nullable String value) Review Comment: `StringUtils.toUtf8WithNullAsEmpty` ########## processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.serde.cell.StagedSerde; +import org.apache.druid.segment.serde.cell.StorableBuffer; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * serializes a Long/String pair in the context of a column/segment. Uses the minValue to perform delta + * encoding/decoding and if the range of the segment fits in an integer (useIntegerDelta), the format is + * Integer:Integer:bytes + * + * otherwise + * Long:Integer:bytes + */ +public class SerializablePairLongStringDeltaEncodedStagedSerde implements StagedSerde<SerializablePairLongString> +{ + private static final byte[] EMPTY_BYTES = new byte[0]; + + private final long minValue; + private final boolean useIntegerDelta; + + public SerializablePairLongStringDeltaEncodedStagedSerde(long minValue, boolean useIntegerDelta) + { + this.minValue = minValue; + this.useIntegerDelta = useIntegerDelta; + } + + @Override + public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value) + { + if (value == null) { + return StorableBuffer.EMPTY; + } + + String rhsString = value.rhs; + byte[] rhsBytes = stringToUtf8Bytes(rhsString); + + return new StorableBuffer() + { + @Override + public void store(ByteBuffer byteBuffer) + { + Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null"); + + long delta = value.lhs - minValue; + + Preconditions.checkState(delta >= 0 || delta == value.lhs); + + if (useIntegerDelta) { + byteBuffer.putInt(Ints.checkedCast(delta)); + } else { + byteBuffer.putLong(delta); + } + + byteBuffer.putInt(rhsBytes.length); + + if (rhsBytes.length > 0) { + byteBuffer.put(rhsBytes); + } + } + + @Override + public int getSerializedSize() + { + return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Integer.BYTES + rhsBytes.length; + } + }; + } + + @Nullable + @Override + public SerializablePairLongString deserialize(ByteBuffer byteBuffer) + { + if (byteBuffer.remaining() == 0) { + return null; + } + + ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder()); + long lhs; + + if (useIntegerDelta) { + lhs = readOnlyBuffer.getInt(); + } else { + lhs = readOnlyBuffer.getLong(); + } + + lhs += minValue; + + int stringSize = readOnlyBuffer.getInt(); + String lastString = null; + + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + + private static byte[] stringToUtf8Bytes(@Nullable String value) + { + return value == null ? EMPTY_BYTES : StringUtils.toUtf8Nullable(value); Review Comment: There appears to be a `StringUtils.toUtf8WithNullToEmpty` that is exactly the semantics of this method, I think. ########## processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.serde.cell; + +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium; + +public class CellWriterTest extends BytesReadWriteTestBase Review Comment: This is a super meta comment, this PR is introducing classes `CellWriter` and `CellReader` that are intending to be reusable. There is a `CellWriter` test, but no `CellReader` test and I haven't dug in yet to validate that there is some other test which uses both of them at the same time. Sometimes, when people are trying to learn how to use a thing that is intended to be reusable, they look for some test classes that can show them the usage pattern. At least, right now, I do not know which test classes to point someone at to understand how to use `CellWriter` and `CellReader`. Even for the `CellWriterTest`, it is currently leveraging the `TestBase` that really helps with validating the technical correctness, but the abstraction doesn't make it easy to see how to use the thing as a black box. I made other comments asking for class-level javadoc, I'd like to also suggest some test somewhere whose goal is more to show how to use the thing and less actually validating technical correctness (of course, it should validate correctness too, just, the primary intent of the test should be an example of how to use the APIs). The class-level javadoc can refer to that test to help solidify how teh thing should be used as well. This suggestion is not saying that the current tests should be changed, it is suggesting the addition of a test somewhere that exemplifies the intended usage pattern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
