gianm commented on code in PR #12745:
URL: https://github.com/apache/druid/pull/12745#discussion_r917024444


##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:

Review Comment:
   I left this as-is for now. But I added this comment:
   
   > Note to developers: if we end up needing to add more fields here, consider 
introducing a Smile (or Protobuf, etc) header to make it simpler to add more 
fields.



##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame
+{
+  public static final long HEADER_SIZE =
+      Byte.BYTES /* version */ +
+      Long.BYTES /* total size */ +
+      Integer.BYTES /* number of rows */ +
+      Integer.BYTES /* number of columns */ +
+      Byte.BYTES /* permuted flag */;
+
+  private static final LZ4Compressor LZ4_COMPRESSOR = 
LZ4Factory.fastestInstance().fastCompressor();
+  private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = 
LZ4Factory.fastestInstance().safeDecompressor();
+  private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // 
Compressed, uncompressed lengths
+  private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // 
Checksum
+  private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = 
COMPRESSED_FRAME_HEADER_SIZE
+                                                            + 
COMPRESSED_FRAME_TRAILER_SIZE;
+  private static final int CHECKSUM_SEED = 0;
+
+  private final Memory memory;
+  private final FrameType frameType;
+  private final long numBytes;
+  private final int numRows;
+  private final int numRegions;
+  private final boolean permuted;
+
+  private Frame(Memory memory, FrameType frameType, long numBytes, int 
numRows, int numRegions, boolean permuted)
+  {
+    this.memory = memory;
+    this.frameType = frameType;
+    this.numBytes = numBytes;
+    this.numRows = numRows;
+    this.numRegions = numRegions;
+    this.permuted = permuted;
+  }
+
+  /**
+   * Returns a frame backed by the provided Memory. This operation does not do 
any copies or allocations.
+   *
+   * The Memory must be in little-endian byte order.
+   *
+   * Behavior is undefined if the memory is modified anytime during the 
lifetime of the Frame object.
+   */
+  public static Frame wrap(final Memory memory)
+  {
+    if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) {
+      throw new IAE("Memory must be little-endian");
+    }
+
+    if (memory.getCapacity() < HEADER_SIZE) {
+      throw new IAE("Memory too short for a header");
+    }
+
+    final byte version = memory.getByte(0);
+    final FrameType frameType = FrameType.forVersion(version);
+
+    if (frameType == null) {
+      throw new IAE("Unexpected byte [%s] at start of frame", version);
+    }
+
+    final long numBytes = memory.getLong(Byte.BYTES);
+    final int numRows = memory.getInt(Byte.BYTES + Long.BYTES);
+    final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + 
Integer.BYTES);
+    final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + 
Integer.BYTES + Integer.BYTES) != 0;
+
+    if (numBytes != memory.getCapacity()) {
+      throw new IAE("Declared size [%,d] does not match actual size [%,d]", 
numBytes, memory.getCapacity());
+    }
+
+    // Size of permuted row indices.
+    final long rowOrderSize = (permuted ? (long) numRows * Integer.BYTES : 0);
+
+    // Size of region ending positions.
+    final long regionEndSize = (long) numRegions * Long.BYTES;
+
+    final long expectedSizeForPreamble = HEADER_SIZE + rowOrderSize + 
regionEndSize;
+
+    if (numBytes < expectedSizeForPreamble) {
+      throw new IAE("Memory too short for preamble");
+    }
+
+    // Verify each region is wholly contained within this buffer.
+    long regionStart = expectedSizeForPreamble; // First region starts 
immediately after preamble.
+    long regionEnd;
+
+    for (int regionNumber = 0; regionNumber < numRegions; regionNumber++) {
+      regionEnd = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
regionNumber * Long.BYTES);
+
+      if (regionEnd < regionStart || regionEnd > numBytes) {
+        throw new ISE(
+            "Region [%d] invalid: end [%,d] out of range [%,d -> %,d]",
+            regionNumber,
+            regionEnd,
+            expectedSizeForPreamble,
+            numBytes
+        );
+      }
+
+      if (regionNumber == 0) {
+        regionStart = expectedSizeForPreamble;
+      } else {
+        regionStart = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
(regionNumber - 1) * Long.BYTES);
+      }
+
+      if (regionStart < expectedSizeForPreamble || regionStart > numBytes) {
+        throw new ISE(
+            "Region [%d] invalid: start [%,d] out of range [%,d -> %,d]",
+            regionNumber,
+            regionStart,
+            expectedSizeForPreamble,
+            numBytes
+        );
+      }
+    }
+
+    return new Frame(memory, frameType, numBytes, numRows, numRegions, 
permuted);
+  }
+
+  /**
+   * Returns a frame backed by the provided ByteBuffer. This operation does 
not do any copies or allocations.
+   *
+   * The position and limit of the buffer are ignored. If you need them to be 
respected, call
+   * {@link ByteBuffer#slice()} first, or use {@link #wrap(Memory)} to wrap a 
particular region.
+   */
+  public static Frame wrap(final ByteBuffer buffer)
+  {
+    return wrap(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN));
+  }
+
+  /**
+   * Returns a frame backed by the provided byte array. This operation does 
not do any copies or allocations.
+   *
+   * The position and limit of the buffer are ignored. If you need them to be 
respected, call
+   * {@link ByteBuffer#slice()} first, or use {@link #wrap(Memory)} to wrap a 
particular region.
+   */
+  public static Frame wrap(final byte[] bytes)
+  {
+    // Wrap using ByteBuffer, not Memory. Even though it's seemingly 
unnecessary, because ByteBuffers are re-wrapped
+    // with Memory anyway, this is beneficial because it enables zero-copy 
optimizations (search for "hasByteBuffer").
+    return wrap(ByteBuffer.wrap(bytes));
+  }
+
+  /**
+   * Decompresses the provided memory and returns a frame backed by that 
decompressed memory. This operation is
+   * safe even on corrupt data: it validates position, length, and checksum 
prior to decompressing.
+   *
+   * This operation allocates memory on-heap to store the decompressed frame.
+   */
+  public static Frame decompress(final Memory memory, final long position, 
final long length)

Review Comment:
   I added a byte at the start of the compressed format to indicate the 
compression strategy. This will be helpful if we want to implement additional 
compression strategies in the future. However, right now, it's always going to 
be LZ4 (0x1), since I didn't do the work to migrate CompressionStrategy to 
Memory.



##########
processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DoubleColumnSelector;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.column.ValueTypes;
+
+import javax.annotation.Nullable;
+
+/**
+ * Reads values written by {@link DoubleFieldWriter}.
+ */
+public class DoubleFieldReader implements FieldReader
+{
+  DoubleFieldReader()
+  {
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(Memory memory, 
ReadableFieldPointer fieldPointer)
+  {
+    return new Selector(memory, fieldPointer);
+  }
+
+  @Override
+  public DimensionSelector makeDimensionSelector(
+      Memory memory,
+      ReadableFieldPointer fieldPointer,
+      @Nullable ExtractionFn extractionFn
+  )
+  {
+    return ValueTypes.makeNumericWrappingDimensionSelector(
+        ValueType.DOUBLE,
+        makeColumnValueSelector(memory, fieldPointer),
+        extractionFn
+    );
+  }
+
+  @Override
+  public boolean isComparable()
+  {
+    return true;
+  }
+
+  private static class Selector implements DoubleColumnSelector
+  {
+    private final Memory dataRegion;
+    private final ReadableFieldPointer fieldPointer;

Review Comment:
   > A few more comments would be helpful here.
   
   I added some brief comments. Please let me know if you had something else in 
mind.
   
   > It can be handy to have a "row" context pointer. If we know we're reading 
row 5, we do the math: 5 * (4 + 1) = 25 is the byte offset.
   
   With the way the code is currently structured, the reader doesn't need to 
know it's part of a row at all, and doesn't need to be aware of the row format. 
I thought that was a nice property. Please let me know if I'm missing something.



##########
processing/src/main/java/org/apache/druid/frame/field/DoubleFieldWriter.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+
+/**
+ * Wraps a {@link BaseDoubleColumnValueSelector} and writes field values.
+ *
+ * Values are transformed such that they are comparable as bytes; see {@link 
#transform} and {@link #detransform}.
+ * Values are preceded by a null byte that is either 0x00 (null) or 0x01 (not 
null). This ensures that nulls sort
+ * earlier than nonnulls.
+ */
+public class DoubleFieldWriter implements FieldWriter
+{
+  public static final int SIZE = Double.BYTES + Byte.BYTES;
+  public static final byte NULL_BYTE = 0x00;
+  public static final byte NOT_NULL_BYTE = 0x01;
+
+  private final BaseDoubleColumnValueSelector selector;
+
+  public DoubleFieldWriter(final BaseDoubleColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public long writeTo(final WritableMemory memory, final long position, final 
long maxSize)

Review Comment:
   RowBasedFrameWriter is the main place this is used; it uses the return value 
of these field-writer functions (bytes written) to increase a single 
`bytesWritten` pointer that it tracks. It seems clean to me but please let me 
know if I'm missing something.



##########
processing/src/main/java/org/apache/druid/frame/field/RowMemoryFieldPointer.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.segment.row.ReadableFrameRowPointer;
+
+/**
+ * A {@link ReadableFieldPointer} that is derived from a row-based frame.
+ */
+public class RowMemoryFieldPointer implements ReadableFieldPointer
+{
+  private final Memory memory;
+  private final ReadableFrameRowPointer rowPointer;
+  private final int fieldNumber;
+  private final int fieldCount;
+
+  public RowMemoryFieldPointer(
+      final Memory memory,
+      final ReadableFrameRowPointer rowPointer,
+      final int fieldNumber,
+      final int fieldCount
+  )
+  {
+    this.memory = memory;
+    this.rowPointer = rowPointer;
+    this.fieldNumber = fieldNumber;
+    this.fieldCount = fieldCount;
+  }
+
+  @Override
+  public long position()
+  {
+    if (fieldNumber == 0) {
+      return rowPointer.position() + (long) Integer.BYTES * fieldCount;
+    } else {
+      return rowPointer.position() + memory.getInt(rowPointer.position() + 
(long) Integer.BYTES * (fieldNumber - 1));
+    }

Review Comment:
   I added some comments here, and added a format description to RowReader.
   
   Quick answer is: a row with N fields is comprised of N pointers (to the end 
of each field), followed by the N fields concatenated togther.



##########
processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import com.google.common.base.Predicate;
+import com.google.common.primitives.Ints;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.read.FrameReaderUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.RangeIndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Reads fields written by {@link StringFieldWriter} or {@link 
StringArrayFieldWriter}.
+ */

Review Comment:
   Fair! I added some details to all the Reader implementations, and linked to 
them from the Writers, to make them easier to find. Quick note for this one: 
it's an actual string, done that way to enable sorting on keys as bytes 
directly. (So, we don't want indirection.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to