LakshSingla commented on code in PR #12745: URL: https://github.com/apache/druid/pull/12745#discussion_r916380034
########## processing/src/main/java/org/apache/druid/frame/segment/row/CursorFrameRowPointer.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.segment.row; + +import com.google.common.base.Preconditions; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.write.RowBasedFrameWriter; +import org.apache.druid.segment.data.ReadableOffset; + Review Comment: nit: Is Javadoc for this class required? I am unable to understand the `update()` method ########## processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.allocation; + +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * A class that allows writing to a series of Memory blocks as if they are one big coherent chunk of memory. Memory + * is allocated along the way using a {@link MemoryAllocator}. Useful for situations where you don't know ahead of time + * exactly how much memory you'll need. + */ +public class AppendableMemory implements Closeable +{ + private static final int NO_BLOCK = -1; + + // Reasonable initial allocation size. Multiple of 4, 5, 8, and 9; meaning int, int + byte, long, and long + byte can + // all be packed into blocks (see blocksPackedAndInitialSize). + private static final int DEFAULT_INITIAL_ALLOCATION_SIZE = 360; + + // Largest allocation that we will do, unless "reserve" is called with a number bigger than this. + private static final int SOFT_MAXIMUM_ALLOCATION_SIZE = DEFAULT_INITIAL_ALLOCATION_SIZE * 4096; + + private final MemoryAllocator allocator; + private int nextAllocationSize; + + // One holder for every Memory we've allocated. + private final List<ResourceHolder<WritableMemory>> blockHolders = new ArrayList<>(); + + // The amount of space that has been used from each Memory block. Same length as "memoryHolders". + private final IntList limits = new IntArrayList(); + + // The global starting position for each Memory block (blockNumber -> position). Same length as "memoryHolders". + private final LongArrayList globalStartPositions = new LongArrayList(); + + // Whether the blocks we've allocated are "packed"; meaning all non-final block limits equal the allocationSize. + private boolean blocksPackedAndInitialSize = true; + + // Return value of cursor(). + private final MemoryRange<WritableMemory> cursor; + + private AppendableMemory(final MemoryAllocator allocator, final int initialAllocationSize) + { + this.allocator = allocator; + this.nextAllocationSize = initialAllocationSize; + this.cursor = new MemoryRange<>(null, 0, 0); + } + + /** + * Creates an appendable memory instance with a default initial allocation size. This default size can accept + * is a multiple of 4, 5, 8, and 9, meaning that {@link #reserve} can accept allocations of that size and + * remain fully-packed. + */ + public static AppendableMemory create(final MemoryAllocator allocator) + { + return new AppendableMemory(allocator, DEFAULT_INITIAL_ALLOCATION_SIZE); + } + + /** + * Creates an appendable memory instance using a particular initial allocation size. + */ + public static AppendableMemory create(final MemoryAllocator allocator, final int initialAllocationSize) + { + return new AppendableMemory(allocator, initialAllocationSize); + } + + /** + * Return a pointer to the current cursor location, which is where the next elements should be written. The returned + * object is updated on calls to {@link #advanceCursor} and {@link #rewindCursor}. + * + * The start of the returned range is the cursor location; the end is the end of the current Memory block. + * + * The returned Memory object is in little-endian order. + */ + public MemoryRange<WritableMemory> cursor() + { + return cursor; + } + + /** + * Ensure that at least "bytes" amount of space is available after the cursor. Allocates a new block if needed. + * Note: the amount of bytes is guaranteed to be in a *single* block. + * + * Does not move the cursor forward. + * + * @return true if reservation was successful, false otherwise. + */ + public boolean reserve(final int bytes) Review Comment: Is it more appropriate to rename `reserve()` to `reserveAdditional()` or equivalent. While the Javadoc specifies that it is the amount of space available after the cursor, it might not be apparent from the name. (For reference, in realloc() in C we specify the total size that we want to reallocate the existing chunk to and not the diff) ########## processing/src/main/java/org/apache/druid/frame/Frame.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame; + +import com.google.common.primitives.Ints; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4SafeDecompressor; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; + +/** + * A data frame. + * + * Frames are split into contiguous "regions". With columnar frames ({@link FrameType#COLUMNAR}) each region + * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are always two regions: row offsets + * and row data. + * + * This object is lightweight. It has constant overhead regardless of the number of rows or regions. + * + * Frames wrap some {@link Memory}. If the memory is backed by a resource that requires explicit releasing, such as + * direct off-heap memory or a memory-mapped file, the creator of the Memory is responsible for releasing that resource + * when the frame is no longer needed. + * + * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} and read with + * {@link org.apache.druid.frame.read.FrameReader}. + * + * Frame format: + * + * - 1 byte: {@link FrameType#version()} + * - 8 bytes: size in bytes of the frame, little-endian long + * - 4 bytes: number of rows, little-endian int + * - 4 bytes: number of regions, little-endian int + * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted + * - 4 bytes x numRows: permutation section; only present for permuted frames. Array of little-endian ints mapping + * logical row numbers to physical row numbers. + * - 8 bytes x numRegions: region offsets. Array of end offsets of each region (exclusive), relative to start of frame, + * as little-endian longs. + * - NNN bytes: regions, back-to-back. + * + * There is also a compressed frame format. Compressed frames are written by {@link #writeTo} when "compress" is + * true, and decompressed by {@link #decompress}. Format: + * + * - 8 bytes: compressed frame length, little-endian long + * - 8 bytes: uncompressed frame length (numBytes), little-endian long + * - NNN bytes: LZ4-compressed frame + * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte header and compressed frame, little-endian long + */ +public class Frame +{ + public static final long HEADER_SIZE = + Byte.BYTES /* version */ + + Long.BYTES /* total size */ + + Integer.BYTES /* number of rows */ + + Integer.BYTES /* number of columns */ + + Byte.BYTES /* permuted flag */; + + private static final LZ4Compressor LZ4_COMPRESSOR = LZ4Factory.fastestInstance().fastCompressor(); + private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor(); + private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // Compressed, uncompressed lengths + private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // Checksum + private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = COMPRESSED_FRAME_HEADER_SIZE + + COMPRESSED_FRAME_TRAILER_SIZE; + private static final int CHECKSUM_SEED = 0; + + private final Memory memory; + private final FrameType frameType; + private final long numBytes; + private final int numRows; + private final int numRegions; + private final boolean permuted; + + private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) + { + this.memory = memory; + this.frameType = frameType; + this.numBytes = numBytes; + this.numRows = numRows; + this.numRegions = numRegions; + this.permuted = permuted; + } + + /** + * Returns a frame backed by the provided Memory. This operation does not do any copies or allocations. + * + * The Memory must be in little-endian byte order. + * + * Behavior is undefined if the memory is modified anytime during the lifetime of the Frame object. + */ + public static Frame wrap(final Memory memory) + { + if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) { + throw new IAE("Memory must be little-endian"); + } + + if (memory.getCapacity() < HEADER_SIZE) { + throw new IAE("Memory too short for a header"); + } + + final byte version = memory.getByte(0); + final FrameType frameType = FrameType.forVersion(version); + + if (frameType == null) { + throw new IAE("Unexpected byte [%s] at start of frame", version); + } + + final long numBytes = memory.getLong(Byte.BYTES); + final int numRows = memory.getInt(Byte.BYTES + Long.BYTES); + final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + Integer.BYTES); + final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + Integer.BYTES + Integer.BYTES) != 0; + + if (numBytes != memory.getCapacity()) { Review Comment: It seems that we have extracted the necessary information required to generate the `Frame` object above. Should the code detecting the correctness of the frame be extracted into a separate method and be called here? That would also allow for quicker testing in case that is desirable. ########## processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.allocation; + +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * A class that allows writing to a series of Memory blocks as if they are one big coherent chunk of memory. Memory + * is allocated along the way using a {@link MemoryAllocator}. Useful for situations where you don't know ahead of time + * exactly how much memory you'll need. + */ +public class AppendableMemory implements Closeable +{ + private static final int NO_BLOCK = -1; + + // Reasonable initial allocation size. Multiple of 4, 5, 8, and 9; meaning int, int + byte, long, and long + byte can + // all be packed into blocks (see blocksPackedAndInitialSize). + private static final int DEFAULT_INITIAL_ALLOCATION_SIZE = 360; + + // Largest allocation that we will do, unless "reserve" is called with a number bigger than this. + private static final int SOFT_MAXIMUM_ALLOCATION_SIZE = DEFAULT_INITIAL_ALLOCATION_SIZE * 4096; + + private final MemoryAllocator allocator; + private int nextAllocationSize; + + // One holder for every Memory we've allocated. + private final List<ResourceHolder<WritableMemory>> blockHolders = new ArrayList<>(); + + // The amount of space that has been used from each Memory block. Same length as "memoryHolders". + private final IntList limits = new IntArrayList(); + + // The global starting position for each Memory block (blockNumber -> position). Same length as "memoryHolders". + private final LongArrayList globalStartPositions = new LongArrayList(); + + // Whether the blocks we've allocated are "packed"; meaning all non-final block limits equal the allocationSize. + private boolean blocksPackedAndInitialSize = true; + + // Return value of cursor(). + private final MemoryRange<WritableMemory> cursor; + + private AppendableMemory(final MemoryAllocator allocator, final int initialAllocationSize) + { + this.allocator = allocator; + this.nextAllocationSize = initialAllocationSize; + this.cursor = new MemoryRange<>(null, 0, 0); + } + + /** + * Creates an appendable memory instance with a default initial allocation size. This default size can accept + * is a multiple of 4, 5, 8, and 9, meaning that {@link #reserve} can accept allocations of that size and + * remain fully-packed. + */ + public static AppendableMemory create(final MemoryAllocator allocator) + { + return new AppendableMemory(allocator, DEFAULT_INITIAL_ALLOCATION_SIZE); + } + + /** + * Creates an appendable memory instance using a particular initial allocation size. + */ + public static AppendableMemory create(final MemoryAllocator allocator, final int initialAllocationSize) + { + return new AppendableMemory(allocator, initialAllocationSize); + } + + /** + * Return a pointer to the current cursor location, which is where the next elements should be written. The returned + * object is updated on calls to {@link #advanceCursor} and {@link #rewindCursor}. + * + * The start of the returned range is the cursor location; the end is the end of the current Memory block. + * + * The returned Memory object is in little-endian order. + */ + public MemoryRange<WritableMemory> cursor() + { + return cursor; + } + + /** + * Ensure that at least "bytes" amount of space is available after the cursor. Allocates a new block if needed. + * Note: the amount of bytes is guaranteed to be in a *single* block. + * + * Does not move the cursor forward. + * + * @return true if reservation was successful, false otherwise. + */ + public boolean reserve(final int bytes) + { + if (bytes < 0) { + throw new IAE("Cannot reserve negative bytes"); + } + + if (bytes == 0) { + return true; + } + + if (bytes > allocator.available()) { + return false; + } + + final int idx = blockHolders.size() - 1; + + if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) { + // Allocation needed. + // Math.max(allocationSize, bytes) in case "bytes" is greater than SOFT_MAXIMUM_ALLOCATION_SIZE. + final Optional<ResourceHolder<WritableMemory>> newMemory = + allocator.allocate(Math.max(nextAllocationSize, bytes)); + + if (!newMemory.isPresent()) { + return false; + } else if (newMemory.get().get().getCapacity() < bytes) { + // Not enough space in the allocation. + newMemory.get().close(); + return false; + } else { + addBlock(newMemory.get()); + + if (!blocksPackedAndInitialSize && nextAllocationSize < SOFT_MAXIMUM_ALLOCATION_SIZE) { + // Increase size each time we add an allocation, to minimize the number of overall allocations. + nextAllocationSize *= 2; + } + } + } + + return true; + } + + /** + * Advances the cursor a certain number of bytes. This number of bytes must not exceed the space available in the + * current block. Typically, it is used to commit the memory most recently reserved by {@link #reserve}. + */ + public void advanceCursor(final int bytes) + { + final int blockNumber = currentBlockNumber(); + + if (blockNumber < 0) { + throw new ISE("No memory; must call 'reserve' first"); + } + + final int currentLimit = limits.getInt(blockNumber); + final WritableMemory currentBlockMemory = blockHolders.get(blockNumber).get(); + final long available = currentBlockMemory.getCapacity() - currentLimit; + + if (bytes > available) { + throw new IAE( + "Cannot advance [%d] bytes; current block only has [%d] additional bytes", + bytes, + available + ); + } + + final int newLimit = currentLimit + bytes; + limits.set(blockNumber, newLimit); + cursor.set(currentBlockMemory, newLimit, currentBlockMemory.getCapacity() - newLimit); + } + + /** + * Rewinds the cursor a certain number of bytes, effectively erasing them. This number of bytes must not exceed + * the current block. Typically, it is used to erase the memory most recently advanced by {@link #advanceCursor}. + */ + public void rewindCursor(final int bytes) Review Comment: I had a doubt here: Since the usage of `AppendableMemory` is such that we want to dynamically allocate memory on the fly, it seems unfair to me that the callee of this method is expected to remember the last block size allocated and not the total memory allocated in the `AppendableMemory`. As per my intuition, blocks are a concept that should remain internal to the AppendableMemory, and rewind should also allow rewinding past the current block. Any thoughts on this? (The additional blocks would stay as is, and not be freed, since we have just rewound the cursor. This would mean `advanceCursor` would also need some logic updation) ########## processing/src/main/java/org/apache/druid/frame/Frame.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame; + +import com.google.common.primitives.Ints; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4SafeDecompressor; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; + +/** + * A data frame. + * + * Frames are split into contiguous "regions". With columnar frames ({@link FrameType#COLUMNAR}) each region + * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are always two regions: row offsets + * and row data. + * + * This object is lightweight. It has constant overhead regardless of the number of rows or regions. + * + * Frames wrap some {@link Memory}. If the memory is backed by a resource that requires explicit releasing, such as + * direct off-heap memory or a memory-mapped file, the creator of the Memory is responsible for releasing that resource + * when the frame is no longer needed. + * + * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} and read with + * {@link org.apache.druid.frame.read.FrameReader}. + * + * Frame format: + * + * - 1 byte: {@link FrameType#version()} + * - 8 bytes: size in bytes of the frame, little-endian long + * - 4 bytes: number of rows, little-endian int + * - 4 bytes: number of regions, little-endian int + * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted + * - 4 bytes x numRows: permutation section; only present for permuted frames. Array of little-endian ints mapping + * logical row numbers to physical row numbers. + * - 8 bytes x numRegions: region offsets. Array of end offsets of each region (exclusive), relative to start of frame, + * as little-endian longs. + * - NNN bytes: regions, back-to-back. + * + * There is also a compressed frame format. Compressed frames are written by {@link #writeTo} when "compress" is + * true, and decompressed by {@link #decompress}. Format: + * + * - 8 bytes: compressed frame length, little-endian long + * - 8 bytes: uncompressed frame length (numBytes), little-endian long + * - NNN bytes: LZ4-compressed frame + * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte header and compressed frame, little-endian long + */ +public class Frame +{ + public static final long HEADER_SIZE = + Byte.BYTES /* version */ + + Long.BYTES /* total size */ + + Integer.BYTES /* number of rows */ + + Integer.BYTES /* number of columns */ + + Byte.BYTES /* permuted flag */; + + private static final LZ4Compressor LZ4_COMPRESSOR = LZ4Factory.fastestInstance().fastCompressor(); + private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor(); + private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // Compressed, uncompressed lengths + private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // Checksum + private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = COMPRESSED_FRAME_HEADER_SIZE + + COMPRESSED_FRAME_TRAILER_SIZE; + private static final int CHECKSUM_SEED = 0; + + private final Memory memory; + private final FrameType frameType; + private final long numBytes; + private final int numRows; + private final int numRegions; + private final boolean permuted; + + private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted) + { + this.memory = memory; + this.frameType = frameType; + this.numBytes = numBytes; + this.numRows = numRows; + this.numRegions = numRegions; + this.permuted = permuted; + } + + /** + * Returns a frame backed by the provided Memory. This operation does not do any copies or allocations. + * + * The Memory must be in little-endian byte order. + * + * Behavior is undefined if the memory is modified anytime during the lifetime of the Frame object. + */ + public static Frame wrap(final Memory memory) + { + if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) { + throw new IAE("Memory must be little-endian"); + } + + if (memory.getCapacity() < HEADER_SIZE) { + throw new IAE("Memory too short for a header"); + } + + final byte version = memory.getByte(0); + final FrameType frameType = FrameType.forVersion(version); + + if (frameType == null) { + throw new IAE("Unexpected byte [%s] at start of frame", version); + } + + final long numBytes = memory.getLong(Byte.BYTES); + final int numRows = memory.getInt(Byte.BYTES + Long.BYTES); + final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + Integer.BYTES); + final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + Integer.BYTES + Integer.BYTES) != 0; + + if (numBytes != memory.getCapacity()) { + throw new IAE("Declared size [%,d] does not match actual size [%,d]", numBytes, memory.getCapacity()); + } + + // Size of permuted row indices. + final long rowOrderSize = (permuted ? (long) numRows * Integer.BYTES : 0); + + // Size of region ending positions. + final long regionEndSize = (long) numRegions * Long.BYTES; + + final long expectedSizeForPreamble = HEADER_SIZE + rowOrderSize + regionEndSize; + + if (numBytes < expectedSizeForPreamble) { + throw new IAE("Memory too short for preamble"); + } + + // Verify each region is wholly contained within this buffer. + long regionStart = expectedSizeForPreamble; // First region starts immediately after preamble. + long regionEnd; + + for (int regionNumber = 0; regionNumber < numRegions; regionNumber++) { + regionEnd = memory.getLong(HEADER_SIZE + rowOrderSize + (long) regionNumber * Long.BYTES); + + if (regionEnd < regionStart || regionEnd > numBytes) { + throw new ISE( Review Comment: nit: Should these be IAE as above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
