gianm commented on code in PR #12745:
URL: https://github.com/apache/druid/pull/12745#discussion_r915244975


##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame
+{
+  public static final long HEADER_SIZE =
+      Byte.BYTES /* version */ +
+      Long.BYTES /* total size */ +
+      Integer.BYTES /* number of rows */ +
+      Integer.BYTES /* number of columns */ +
+      Byte.BYTES /* permuted flag */;
+
+  private static final LZ4Compressor LZ4_COMPRESSOR = 
LZ4Factory.fastestInstance().fastCompressor();
+  private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = 
LZ4Factory.fastestInstance().safeDecompressor();
+  private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // 
Compressed, uncompressed lengths
+  private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // 
Checksum
+  private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = 
COMPRESSED_FRAME_HEADER_SIZE
+                                                            + 
COMPRESSED_FRAME_TRAILER_SIZE;
+  private static final int CHECKSUM_SEED = 0;
+
+  private final Memory memory;
+  private final FrameType frameType;
+  private final long numBytes;
+  private final int numRows;
+  private final int numRegions;
+  private final boolean permuted;
+
+  private Frame(Memory memory, FrameType frameType, long numBytes, int 
numRows, int numRegions, boolean permuted)
+  {
+    this.memory = memory;
+    this.frameType = frameType;
+    this.numBytes = numBytes;
+    this.numRows = numRows;
+    this.numRegions = numRegions;
+    this.permuted = permuted;
+  }
+
+  /**
+   * Returns a frame backed by the provided Memory. This operation does not do 
any copies or allocations.
+   *
+   * The Memory must be in little-endian byte order.
+   *
+   * Behavior is undefined if the memory is modified anytime during the 
lifetime of the Frame object.
+   */
+  public static Frame wrap(final Memory memory)
+  {
+    if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) {
+      throw new IAE("Memory must be little-endian");
+    }
+
+    if (memory.getCapacity() < HEADER_SIZE) {
+      throw new IAE("Memory too short for a header");
+    }
+
+    final byte version = memory.getByte(0);
+    final FrameType frameType = FrameType.forVersion(version);
+
+    if (frameType == null) {
+      throw new IAE("Unexpected byte [%s] at start of frame", version);
+    }
+
+    final long numBytes = memory.getLong(Byte.BYTES);
+    final int numRows = memory.getInt(Byte.BYTES + Long.BYTES);
+    final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + 
Integer.BYTES);
+    final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + 
Integer.BYTES + Integer.BYTES) != 0;
+
+    if (numBytes != memory.getCapacity()) {
+      throw new IAE("Declared size [%,d] does not match actual size [%,d]", 
numBytes, memory.getCapacity());
+    }
+
+    // Size of permuted row indices.
+    final long rowOrderSize = (permuted ? (long) numRows * Integer.BYTES : 0);
+
+    // Size of region ending positions.
+    final long regionEndSize = (long) numRegions * Long.BYTES;
+
+    final long expectedSizeForPreamble = HEADER_SIZE + rowOrderSize + 
regionEndSize;
+
+    if (numBytes < expectedSizeForPreamble) {
+      throw new IAE("Memory too short for preamble");
+    }
+
+    // Verify each region is wholly contained within this buffer.
+    long regionStart = expectedSizeForPreamble; // First region starts 
immediately after preamble.
+    long regionEnd;
+
+    for (int regionNumber = 0; regionNumber < numRegions; regionNumber++) {
+      regionEnd = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
regionNumber * Long.BYTES);
+
+      if (regionEnd < regionStart || regionEnd > numBytes) {
+        throw new ISE(
+            "Region [%d] invalid: end [%,d] out of range [%,d -> %,d]",
+            regionNumber,
+            regionEnd,
+            expectedSizeForPreamble,
+            numBytes
+        );
+      }
+
+      if (regionNumber == 0) {
+        regionStart = expectedSizeForPreamble;
+      } else {
+        regionStart = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
(regionNumber - 1) * Long.BYTES);
+      }
+
+      if (regionStart < expectedSizeForPreamble || regionStart > numBytes) {
+        throw new ISE(
+            "Region [%d] invalid: start [%,d] out of range [%,d -> %,d]",
+            regionNumber,
+            regionStart,
+            expectedSizeForPreamble,
+            numBytes
+        );
+      }
+    }
+
+    return new Frame(memory, frameType, numBytes, numRows, numRegions, 
permuted);
+  }
+
+  /**
+   * Returns a frame backed by the provided ByteBuffer. This operation does 
not do any copies or allocations.
+   *
+   * The position and limit of the buffer are ignored. If you need them to be 
respected, call
+   * {@link ByteBuffer#slice()} first, or use {@link #wrap(Memory)} to wrap a 
particular region.
+   */
+  public static Frame wrap(final ByteBuffer buffer)
+  {
+    return wrap(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN));
+  }
+
+  /**
+   * Returns a frame backed by the provided byte array. This operation does 
not do any copies or allocations.
+   *
+   * The position and limit of the buffer are ignored. If you need them to be 
respected, call
+   * {@link ByteBuffer#slice()} first, or use {@link #wrap(Memory)} to wrap a 
particular region.
+   */
+  public static Frame wrap(final byte[] bytes)
+  {
+    // Wrap using ByteBuffer, not Memory. Even though it's seemingly 
unnecessary, because ByteBuffers are re-wrapped
+    // with Memory anyway, this is beneficial because it enables zero-copy 
optimizations (search for "hasByteBuffer").
+    return wrap(ByteBuffer.wrap(bytes));
+  }
+
+  /**
+   * Decompresses the provided memory and returns a frame backed by that 
decompressed memory. This operation is
+   * safe even on corrupt data: it validates position, length, and checksum 
prior to decompressing.
+   *
+   * This operation allocates memory on-heap to store the decompressed frame.
+   */
+  public static Frame decompress(final Memory memory, final long position, 
final long length)

Review Comment:
   Ah, good point. We have an interface for this: CompressionStrategy. There's 
implementations for LZF, LZ4, ZSTD, and UNCOMPRESSED. I didn't use it because 
it's oriented around ByteBuffer, not Memory, and some work would be needed to 
make it work properly on Memory. I also didn't see a case where we'd actually 
want to use any non-LZ4 compressor at this time.
   
   I'm thinking that if we don't migrate CompressionStrategy to Memory now, we 
should at least future-proof this compressed frame format by adding an 
additional byte for compression strategy. (CompressionStrategy impls are 
identified by a byte code.) It would always be LZ4 (0x1) at first, so it's 
compatible with a world where CompressionStrategy takes over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to