gianm commented on code in PR #12745:
URL: https://github.com/apache/druid/pull/12745#discussion_r917024187


##########
processing/src/test/java/org/apache/druid/frame/field/StringFieldReaderTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.SubstringDimExtractionFn;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionDictionarySelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.RangeIndexedInts;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringFieldReaderTest extends InitializedNullHandlingTest
+{
+  private static final long MEMORY_POSITION = 1;
+
+  @Rule
+  public MockitoRule mockitoRule = 
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+  @Mock
+  public DimensionSelector writeSelector;
+
+  private WritableMemory memory;
+  private FieldWriter fieldWriter;
+
+  @Before
+  public void setUp()
+  {
+    memory = WritableMemory.allocate(1000);
+    fieldWriter = new StringFieldWriter(writeSelector);
+  }
+
+  @After
+  public void tearDown()
+  {
+    fieldWriter.close();
+  }
+
+  @Test
+  public void test_makeColumnValueSelector_singleString_notArray()
+  {
+    writeToMemory(Collections.singletonList("foo"));

Review Comment:
   > Do we have tests that show how to write a row of, say, a string, long and 
array? That would show how we coordinate the offsets, how we write the values 
together, and how we read the entire row.
   
   There isn't for the field writers, since the field writers aren't aware of 
the row format. But! For an example, check out 
`RowBasedFrameWriter#writeDataUsingFieldWriters`, the main production usage.



##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames wrap some {@link Memory}. If the memory is backed by a resource that 
requires explicit releasing, such as
+ * direct off-heap memory or a memory-mapped file, the creator of the Memory 
is responsible for releasing that resource
+ * when the frame is no longer needed.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame
+{
+  public static final long HEADER_SIZE =
+      Byte.BYTES /* version */ +
+      Long.BYTES /* total size */ +
+      Integer.BYTES /* number of rows */ +
+      Integer.BYTES /* number of columns */ +
+      Byte.BYTES /* permuted flag */;
+
+  private static final LZ4Compressor LZ4_COMPRESSOR = 
LZ4Factory.fastestInstance().fastCompressor();
+  private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = 
LZ4Factory.fastestInstance().safeDecompressor();
+  private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // 
Compressed, uncompressed lengths
+  private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // 
Checksum
+  private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = 
COMPRESSED_FRAME_HEADER_SIZE
+                                                            + 
COMPRESSED_FRAME_TRAILER_SIZE;
+  private static final int CHECKSUM_SEED = 0;
+
+  private final Memory memory;
+  private final FrameType frameType;
+  private final long numBytes;
+  private final int numRows;
+  private final int numRegions;
+  private final boolean permuted;
+
+  private Frame(Memory memory, FrameType frameType, long numBytes, int 
numRows, int numRegions, boolean permuted)
+  {
+    this.memory = memory;
+    this.frameType = frameType;
+    this.numBytes = numBytes;
+    this.numRows = numRows;
+    this.numRegions = numRegions;
+    this.permuted = permuted;
+  }
+
+  /**
+   * Returns a frame backed by the provided Memory. This operation does not do 
any copies or allocations.
+   *
+   * The Memory must be in little-endian byte order.
+   *
+   * Behavior is undefined if the memory is modified anytime during the 
lifetime of the Frame object.
+   */
+  public static Frame wrap(final Memory memory)
+  {
+    if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) {
+      throw new IAE("Memory must be little-endian");
+    }
+
+    if (memory.getCapacity() < HEADER_SIZE) {
+      throw new IAE("Memory too short for a header");
+    }
+
+    final byte version = memory.getByte(0);
+    final FrameType frameType = FrameType.forVersion(version);
+
+    if (frameType == null) {
+      throw new IAE("Unexpected byte [%s] at start of frame", version);
+    }
+
+    final long numBytes = memory.getLong(Byte.BYTES);
+    final int numRows = memory.getInt(Byte.BYTES + Long.BYTES);
+    final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + 
Integer.BYTES);
+    final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + 
Integer.BYTES + Integer.BYTES) != 0;
+
+    if (numBytes != memory.getCapacity()) {
+      throw new IAE("Declared size [%,d] does not match actual size [%,d]", 
numBytes, memory.getCapacity());
+    }
+
+    // Size of permuted row indices.
+    final long rowOrderSize = (permuted ? (long) numRows * Integer.BYTES : 0);
+
+    // Size of region ending positions.
+    final long regionEndSize = (long) numRegions * Long.BYTES;
+
+    final long expectedSizeForPreamble = HEADER_SIZE + rowOrderSize + 
regionEndSize;
+
+    if (numBytes < expectedSizeForPreamble) {
+      throw new IAE("Memory too short for preamble");
+    }
+
+    // Verify each region is wholly contained within this buffer.
+    long regionStart = expectedSizeForPreamble; // First region starts 
immediately after preamble.
+    long regionEnd;
+
+    for (int regionNumber = 0; regionNumber < numRegions; regionNumber++) {
+      regionEnd = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
regionNumber * Long.BYTES);
+
+      if (regionEnd < regionStart || regionEnd > numBytes) {
+        throw new ISE(

Review Comment:
   Hmm. Sure. I changed them.



##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames wrap some {@link Memory}. If the memory is backed by a resource that 
requires explicit releasing, such as
+ * direct off-heap memory or a memory-mapped file, the creator of the Memory 
is responsible for releasing that resource
+ * when the frame is no longer needed.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame
+{
+  public static final long HEADER_SIZE =
+      Byte.BYTES /* version */ +
+      Long.BYTES /* total size */ +
+      Integer.BYTES /* number of rows */ +
+      Integer.BYTES /* number of columns */ +
+      Byte.BYTES /* permuted flag */;
+
+  private static final LZ4Compressor LZ4_COMPRESSOR = 
LZ4Factory.fastestInstance().fastCompressor();
+  private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = 
LZ4Factory.fastestInstance().safeDecompressor();
+  private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // 
Compressed, uncompressed lengths
+  private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // 
Checksum
+  private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = 
COMPRESSED_FRAME_HEADER_SIZE
+                                                            + 
COMPRESSED_FRAME_TRAILER_SIZE;
+  private static final int CHECKSUM_SEED = 0;
+
+  private final Memory memory;
+  private final FrameType frameType;
+  private final long numBytes;
+  private final int numRows;
+  private final int numRegions;
+  private final boolean permuted;
+
+  private Frame(Memory memory, FrameType frameType, long numBytes, int 
numRows, int numRegions, boolean permuted)
+  {
+    this.memory = memory;
+    this.frameType = frameType;
+    this.numBytes = numBytes;
+    this.numRows = numRows;
+    this.numRegions = numRegions;
+    this.permuted = permuted;
+  }
+
+  /**
+   * Returns a frame backed by the provided Memory. This operation does not do 
any copies or allocations.
+   *
+   * The Memory must be in little-endian byte order.
+   *
+   * Behavior is undefined if the memory is modified anytime during the 
lifetime of the Frame object.
+   */
+  public static Frame wrap(final Memory memory)
+  {
+    if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) {
+      throw new IAE("Memory must be little-endian");
+    }
+
+    if (memory.getCapacity() < HEADER_SIZE) {
+      throw new IAE("Memory too short for a header");
+    }
+
+    final byte version = memory.getByte(0);
+    final FrameType frameType = FrameType.forVersion(version);
+
+    if (frameType == null) {
+      throw new IAE("Unexpected byte [%s] at start of frame", version);
+    }
+
+    final long numBytes = memory.getLong(Byte.BYTES);
+    final int numRows = memory.getInt(Byte.BYTES + Long.BYTES);
+    final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + 
Integer.BYTES);
+    final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + 
Integer.BYTES + Integer.BYTES) != 0;
+
+    if (numBytes != memory.getCapacity()) {

Review Comment:
   Good idea. I moved it to a new `validate` method. The code looks cleaner now.



##########
processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.allocation;
+
+import com.google.common.primitives.Ints;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A class that allows writing to a series of Memory blocks as if they are one 
big coherent chunk of memory. Memory
+ * is allocated along the way using a {@link MemoryAllocator}. Useful for 
situations where you don't know ahead of time
+ * exactly how much memory you'll need.
+ */
+public class AppendableMemory implements Closeable
+{
+  private static final int NO_BLOCK = -1;
+
+  // Reasonable initial allocation size. Multiple of 4, 5, 8, and 9; meaning 
int, int + byte, long, and long + byte can
+  // all be packed into blocks (see blocksPackedAndInitialSize).
+  private static final int DEFAULT_INITIAL_ALLOCATION_SIZE = 360;
+
+  // Largest allocation that we will do, unless "reserve" is called with a 
number bigger than this.
+  private static final int SOFT_MAXIMUM_ALLOCATION_SIZE = 
DEFAULT_INITIAL_ALLOCATION_SIZE * 4096;
+
+  private final MemoryAllocator allocator;
+  private int nextAllocationSize;
+
+  // One holder for every Memory we've allocated.
+  private final List<ResourceHolder<WritableMemory>> blockHolders = new 
ArrayList<>();
+
+  // The amount of space that has been used from each Memory block. Same 
length as "memoryHolders".
+  private final IntList limits = new IntArrayList();
+
+  // The global starting position for each Memory block (blockNumber -> 
position). Same length as "memoryHolders".
+  private final LongArrayList globalStartPositions = new LongArrayList();
+
+  // Whether the blocks we've allocated are "packed"; meaning all non-final 
block limits equal the allocationSize.
+  private boolean blocksPackedAndInitialSize = true;
+
+  // Return value of cursor().
+  private final MemoryRange<WritableMemory> cursor;
+
+  private AppendableMemory(final MemoryAllocator allocator, final int 
initialAllocationSize)
+  {
+    this.allocator = allocator;
+    this.nextAllocationSize = initialAllocationSize;
+    this.cursor = new MemoryRange<>(null, 0, 0);
+  }
+
+  /**
+   * Creates an appendable memory instance with a default initial allocation 
size. This default size can accept
+   * is a multiple of 4, 5, 8, and 9, meaning that {@link #reserve} can accept 
allocations of that size and
+   * remain fully-packed.
+   */
+  public static AppendableMemory create(final MemoryAllocator allocator)
+  {
+    return new AppendableMemory(allocator, DEFAULT_INITIAL_ALLOCATION_SIZE);
+  }
+
+  /**
+   * Creates an appendable memory instance using a particular initial 
allocation size.
+   */
+  public static AppendableMemory create(final MemoryAllocator allocator, final 
int initialAllocationSize)
+  {
+    return new AppendableMemory(allocator, initialAllocationSize);
+  }
+
+  /**
+   * Return a pointer to the current cursor location, which is where the next 
elements should be written. The returned
+   * object is updated on calls to {@link #advanceCursor} and {@link 
#rewindCursor}.
+   *
+   * The start of the returned range is the cursor location; the end is the 
end of the current Memory block.
+   *
+   * The returned Memory object is in little-endian order.
+   */
+  public MemoryRange<WritableMemory> cursor()
+  {
+    return cursor;
+  }
+
+  /**
+   * Ensure that at least "bytes" amount of space is available after the 
cursor. Allocates a new block if needed.
+   * Note: the amount of bytes is guaranteed to be in a *single* block.
+   *
+   * Does not move the cursor forward.
+   *
+   * @return true if reservation was successful, false otherwise.
+   */
+  public boolean reserve(final int bytes)

Review Comment:
   Sure, that makes sense. I renamed it to `reserveAdditional` and added some 
more notes about usage.



##########
processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.allocation;
+
+import com.google.common.primitives.Ints;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A class that allows writing to a series of Memory blocks as if they are one 
big coherent chunk of memory. Memory
+ * is allocated along the way using a {@link MemoryAllocator}. Useful for 
situations where you don't know ahead of time
+ * exactly how much memory you'll need.
+ */
+public class AppendableMemory implements Closeable
+{
+  private static final int NO_BLOCK = -1;
+
+  // Reasonable initial allocation size. Multiple of 4, 5, 8, and 9; meaning 
int, int + byte, long, and long + byte can
+  // all be packed into blocks (see blocksPackedAndInitialSize).
+  private static final int DEFAULT_INITIAL_ALLOCATION_SIZE = 360;
+
+  // Largest allocation that we will do, unless "reserve" is called with a 
number bigger than this.
+  private static final int SOFT_MAXIMUM_ALLOCATION_SIZE = 
DEFAULT_INITIAL_ALLOCATION_SIZE * 4096;
+
+  private final MemoryAllocator allocator;
+  private int nextAllocationSize;
+
+  // One holder for every Memory we've allocated.
+  private final List<ResourceHolder<WritableMemory>> blockHolders = new 
ArrayList<>();
+
+  // The amount of space that has been used from each Memory block. Same 
length as "memoryHolders".
+  private final IntList limits = new IntArrayList();
+
+  // The global starting position for each Memory block (blockNumber -> 
position). Same length as "memoryHolders".
+  private final LongArrayList globalStartPositions = new LongArrayList();
+
+  // Whether the blocks we've allocated are "packed"; meaning all non-final 
block limits equal the allocationSize.
+  private boolean blocksPackedAndInitialSize = true;
+
+  // Return value of cursor().
+  private final MemoryRange<WritableMemory> cursor;
+
+  private AppendableMemory(final MemoryAllocator allocator, final int 
initialAllocationSize)
+  {
+    this.allocator = allocator;
+    this.nextAllocationSize = initialAllocationSize;
+    this.cursor = new MemoryRange<>(null, 0, 0);
+  }
+
+  /**
+   * Creates an appendable memory instance with a default initial allocation 
size. This default size can accept
+   * is a multiple of 4, 5, 8, and 9, meaning that {@link #reserve} can accept 
allocations of that size and
+   * remain fully-packed.
+   */
+  public static AppendableMemory create(final MemoryAllocator allocator)
+  {
+    return new AppendableMemory(allocator, DEFAULT_INITIAL_ALLOCATION_SIZE);
+  }
+
+  /**
+   * Creates an appendable memory instance using a particular initial 
allocation size.
+   */
+  public static AppendableMemory create(final MemoryAllocator allocator, final 
int initialAllocationSize)
+  {
+    return new AppendableMemory(allocator, initialAllocationSize);
+  }
+
+  /**
+   * Return a pointer to the current cursor location, which is where the next 
elements should be written. The returned
+   * object is updated on calls to {@link #advanceCursor} and {@link 
#rewindCursor}.
+   *
+   * The start of the returned range is the cursor location; the end is the 
end of the current Memory block.
+   *
+   * The returned Memory object is in little-endian order.
+   */
+  public MemoryRange<WritableMemory> cursor()
+  {
+    return cursor;
+  }
+
+  /**
+   * Ensure that at least "bytes" amount of space is available after the 
cursor. Allocates a new block if needed.
+   * Note: the amount of bytes is guaranteed to be in a *single* block.
+   *
+   * Does not move the cursor forward.
+   *
+   * @return true if reservation was successful, false otherwise.
+   */
+  public boolean reserve(final int bytes)
+  {
+    if (bytes < 0) {
+      throw new IAE("Cannot reserve negative bytes");
+    }
+
+    if (bytes == 0) {
+      return true;
+    }
+
+    if (bytes > allocator.available()) {
+      return false;
+    }
+
+    final int idx = blockHolders.size() - 1;
+
+    if (idx < 0 || bytes + limits.getInt(idx) > 
blockHolders.get(idx).get().getCapacity()) {
+      // Allocation needed.
+      // Math.max(allocationSize, bytes) in case "bytes" is greater than 
SOFT_MAXIMUM_ALLOCATION_SIZE.
+      final Optional<ResourceHolder<WritableMemory>> newMemory =
+          allocator.allocate(Math.max(nextAllocationSize, bytes));
+
+      if (!newMemory.isPresent()) {
+        return false;
+      } else if (newMemory.get().get().getCapacity() < bytes) {
+        // Not enough space in the allocation.
+        newMemory.get().close();
+        return false;
+      } else {
+        addBlock(newMemory.get());
+
+        if (!blocksPackedAndInitialSize && nextAllocationSize < 
SOFT_MAXIMUM_ALLOCATION_SIZE) {
+          // Increase size each time we add an allocation, to minimize the 
number of overall allocations.
+          nextAllocationSize *= 2;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Advances the cursor a certain number of bytes. This number of bytes must 
not exceed the space available in the
+   * current block. Typically, it is used to commit the memory most recently 
reserved by {@link #reserve}.
+   */
+  public void advanceCursor(final int bytes)
+  {
+    final int blockNumber = currentBlockNumber();
+
+    if (blockNumber < 0) {
+      throw new ISE("No memory; must call 'reserve' first");
+    }
+
+    final int currentLimit = limits.getInt(blockNumber);
+    final WritableMemory currentBlockMemory = 
blockHolders.get(blockNumber).get();
+    final long available = currentBlockMemory.getCapacity() - currentLimit;
+
+    if (bytes > available) {
+      throw new IAE(
+          "Cannot advance [%d] bytes; current block only has [%d] additional 
bytes",
+          bytes,
+          available
+      );
+    }
+
+    final int newLimit = currentLimit + bytes;
+    limits.set(blockNumber, newLimit);
+    cursor.set(currentBlockMemory, newLimit, currentBlockMemory.getCapacity() 
- newLimit);
+  }
+
+  /**
+   * Rewinds the cursor a certain number of bytes, effectively erasing them. 
This number of bytes must not exceed
+   * the current block. Typically, it is used to erase the memory most 
recently advanced by {@link #advanceCursor}.
+   */
+  public void rewindCursor(final int bytes)

Review Comment:
   The expected usage is that callers will call reserveAdditional, then write 
some stuff, then call advanceCursor. Then, they might want to undo it. (This 
happens when undoing the most recent row written.) Callers don't need to undo 
more than the most recent call to advanceCursor, so the block boundary thing 
isn't a problem in practice. I wrote some javadocs clarifying this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to