gianm commented on code in PR #12745:
URL: https://github.com/apache/druid/pull/12745#discussion_r917025031


##########
processing/src/main/java/org/apache/druid/frame/segment/row/CursorFrameRowPointer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.segment.row;
+
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
+import org.apache.druid.segment.data.ReadableOffset;
+

Review Comment:
   I added a few comments. Hopefully they make sense. Please let me know if 
it's still murky.



##########
processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.allocation;
+
+import com.google.common.primitives.Ints;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A class that allows writing to a series of Memory blocks as if they are one 
big coherent chunk of memory. Memory
+ * is allocated along the way using a {@link MemoryAllocator}. Useful for 
situations where you don't know ahead of time
+ * exactly how much memory you'll need.
+ */
+public class AppendableMemory implements Closeable
+{
+  private static final int NO_BLOCK = -1;
+
+  // Reasonable initial allocation size. Multiple of 4, 5, 8, and 9; meaning 
int, int + byte, long, and long + byte can
+  // all be packed into blocks (see blocksPackedAndInitialSize).
+  private static final int DEFAULT_INITIAL_ALLOCATION_SIZE = 360;
+
+  // Largest allocation that we will do, unless "reserve" is called with a 
number bigger than this.
+  private static final int SOFT_MAXIMUM_ALLOCATION_SIZE = 
DEFAULT_INITIAL_ALLOCATION_SIZE * 4096;
+
+  private final MemoryAllocator allocator;
+  private int nextAllocationSize;
+
+  // One holder for every Memory we've allocated.
+  private final List<ResourceHolder<WritableMemory>> blockHolders = new 
ArrayList<>();
+
+  // The amount of space that has been used from each Memory block. Same 
length as "memoryHolders".
+  private final IntList limits = new IntArrayList();
+
+  // The global starting position for each Memory block (blockNumber -> 
position). Same length as "memoryHolders".
+  private final LongArrayList globalStartPositions = new LongArrayList();
+
+  // Whether the blocks we've allocated are "packed"; meaning all non-final 
block limits equal the allocationSize.
+  private boolean blocksPackedAndInitialSize = true;
+
+  // Return value of cursor().
+  private final MemoryRange<WritableMemory> cursor;
+
+  private AppendableMemory(final MemoryAllocator allocator, final int 
initialAllocationSize)
+  {
+    this.allocator = allocator;
+    this.nextAllocationSize = initialAllocationSize;
+    this.cursor = new MemoryRange<>(null, 0, 0);
+  }
+
+  /**
+   * Creates an appendable memory instance with a default initial allocation 
size. This default size can accept
+   * is a multiple of 4, 5, 8, and 9, meaning that {@link #reserve} can accept 
allocations of that size and
+   * remain fully-packed.
+   */
+  public static AppendableMemory create(final MemoryAllocator allocator)
+  {
+    return new AppendableMemory(allocator, DEFAULT_INITIAL_ALLOCATION_SIZE);
+  }
+
+  /**
+   * Creates an appendable memory instance using a particular initial 
allocation size.
+   */
+  public static AppendableMemory create(final MemoryAllocator allocator, final 
int initialAllocationSize)
+  {
+    return new AppendableMemory(allocator, initialAllocationSize);
+  }
+
+  /**
+   * Return a pointer to the current cursor location, which is where the next 
elements should be written. The returned
+   * object is updated on calls to {@link #advanceCursor} and {@link 
#rewindCursor}.
+   *
+   * The start of the returned range is the cursor location; the end is the 
end of the current Memory block.
+   *
+   * The returned Memory object is in little-endian order.
+   */
+  public MemoryRange<WritableMemory> cursor()
+  {
+    return cursor;
+  }
+
+  /**
+   * Ensure that at least "bytes" amount of space is available after the 
cursor. Allocates a new block if needed.
+   * Note: the amount of bytes is guaranteed to be in a *single* block.
+   *
+   * Does not move the cursor forward.
+   *
+   * @return true if reservation was successful, false otherwise.
+   */
+  public boolean reserve(final int bytes)
+  {
+    if (bytes < 0) {
+      throw new IAE("Cannot reserve negative bytes");
+    }
+
+    if (bytes == 0) {
+      return true;
+    }
+
+    if (bytes > allocator.available()) {
+      return false;
+    }
+
+    final int idx = blockHolders.size() - 1;
+
+    if (idx < 0 || bytes + limits.getInt(idx) > 
blockHolders.get(idx).get().getCapacity()) {
+      // Allocation needed.
+      // Math.max(allocationSize, bytes) in case "bytes" is greater than 
SOFT_MAXIMUM_ALLOCATION_SIZE.
+      final Optional<ResourceHolder<WritableMemory>> newMemory =
+          allocator.allocate(Math.max(nextAllocationSize, bytes));
+
+      if (!newMemory.isPresent()) {
+        return false;
+      } else if (newMemory.get().get().getCapacity() < bytes) {
+        // Not enough space in the allocation.
+        newMemory.get().close();
+        return false;
+      } else {
+        addBlock(newMemory.get());
+
+        if (!blocksPackedAndInitialSize && nextAllocationSize < 
SOFT_MAXIMUM_ALLOCATION_SIZE) {
+          // Increase size each time we add an allocation, to minimize the 
number of overall allocations.
+          nextAllocationSize *= 2;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Advances the cursor a certain number of bytes. This number of bytes must 
not exceed the space available in the
+   * current block. Typically, it is used to commit the memory most recently 
reserved by {@link #reserve}.
+   */
+  public void advanceCursor(final int bytes)
+  {
+    final int blockNumber = currentBlockNumber();
+
+    if (blockNumber < 0) {
+      throw new ISE("No memory; must call 'reserve' first");
+    }
+
+    final int currentLimit = limits.getInt(blockNumber);
+    final WritableMemory currentBlockMemory = 
blockHolders.get(blockNumber).get();
+    final long available = currentBlockMemory.getCapacity() - currentLimit;
+
+    if (bytes > available) {
+      throw new IAE(
+          "Cannot advance [%d] bytes; current block only has [%d] additional 
bytes",
+          bytes,
+          available
+      );
+    }
+
+    final int newLimit = currentLimit + bytes;
+    limits.set(blockNumber, newLimit);
+    cursor.set(currentBlockMemory, newLimit, currentBlockMemory.getCapacity() 
- newLimit);
+  }
+
+  /**
+   * Rewinds the cursor a certain number of bytes, effectively erasing them. 
This number of bytes must not exceed
+   * the current block. Typically, it is used to erase the memory most 
recently advanced by {@link #advanceCursor}.
+   */
+  public void rewindCursor(final int bytes)

Review Comment:
   The expected usage is that callers will call reserveAdditional, then write 
some stuff, then call advanceCursor. Then, they might want to undo it. (This 
happens when undoing the most recent row written.) Callers don't need to undo 
more than the most recent call to advanceCursor, so the block boundary thing 
isn't a problem in practice. I wrote some javadocs clarifying this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to