lidavidm commented on code in PR #41285:
URL: https://github.com/apache/arrow/pull/41285#discussion_r1594966203


##########
java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueViewVector.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.complex;
+
+import static org.apache.arrow.memory.util.LargeMemoryUtil.capAtMaxInt;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.CommonUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.AddOrGetResult;
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.BaseValueVector;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.DensityAwareVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
+
+public abstract class BaseRepeatedValueViewVector extends BaseValueVector
+    implements RepeatedValueVector, BaseListVector {
+
+  public static final FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
+  public static final String DATA_VECTOR_NAME = "$data$";
+
+  public static final byte OFFSET_WIDTH = 4;
+  public static final byte SIZE_WIDTH = 4;
+  protected ArrowBuf offsetBuffer;
+  protected ArrowBuf sizeBuffer;
+  protected FieldVector vector;
+  protected final CallBack repeatedCallBack;
+  protected int valueCount;
+  protected long offsetAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * 
OFFSET_WIDTH;
+  protected long sizeAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * 
SIZE_WIDTH;
+  private final String name;
+
+  protected String defaultDataVectorName = DATA_VECTOR_NAME;
+
+  protected BaseRepeatedValueViewVector(String name, BufferAllocator 
allocator, CallBack callBack) {
+    this(name, allocator, DEFAULT_DATA_VECTOR, callBack);
+  }
+
+  protected BaseRepeatedValueViewVector(
+      String name, BufferAllocator allocator, FieldVector vector, CallBack 
callBack) {
+    super(allocator);
+    this.name = name;
+    this.offsetBuffer = allocator.getEmpty();
+    this.sizeBuffer = allocator.getEmpty();
+    this.vector = Preconditions.checkNotNull(vector, "data vector cannot be 
null");
+    this.repeatedCallBack = callBack;
+    this.valueCount = 0;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    boolean dataAlloc = false;
+    try {
+      allocateBuffers();
+      dataAlloc = vector.allocateNewSafe();
+    } catch (Exception e) {
+      clear();
+      return false;
+    } finally {
+      if (!dataAlloc) {
+        clear();
+      }
+    }
+    return dataAlloc;
+  }
+
+  private void allocateBuffers() {
+    offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
+    sizeBuffer = allocateSizeBuffer(sizeAllocationSizeInBytes);
+  }
+
+  private ArrowBuf allocateOffsetBuffer(final long size) {
+    final int curSize = (int) size;
+    ArrowBuf offsetBuffer = allocator.buffer(curSize);
+    offsetBuffer.readerIndex(0);
+    offsetAllocationSizeInBytes = curSize;
+    offsetBuffer.setZero(0, offsetBuffer.capacity());
+    return offsetBuffer;
+  }
+
+  private ArrowBuf allocateSizeBuffer(final long size) {
+    final int curSize = (int) size;
+    ArrowBuf sizeBuffer = allocator.buffer(curSize);
+    sizeBuffer.readerIndex(0);
+    sizeAllocationSizeInBytes = curSize;
+    sizeBuffer.setZero(0, sizeBuffer.capacity());
+    return sizeBuffer;
+  }
+
+  @Override
+  public void reAlloc() {
+    reallocateBuffers();
+    vector.reAlloc();
+  }
+
+  protected void reallocateBuffers() {
+    reallocOffsetBuffer();
+    reallocSizeBuffer();
+  }
+
+  private void reallocOffsetBuffer() {
+    final long currentBufferCapacity = offsetBuffer.capacity();
+    long newAllocationSize = currentBufferCapacity * 2;
+    if (newAllocationSize == 0) {
+      if (offsetAllocationSizeInBytes > 0) {
+        newAllocationSize = offsetAllocationSizeInBytes;
+      } else {
+        newAllocationSize = INITIAL_VALUE_ALLOCATION * OFFSET_WIDTH * 2;
+      }
+    }
+
+    newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize);
+    newAllocationSize = Math.min(newAllocationSize, (long) OFFSET_WIDTH * 
Integer.MAX_VALUE);
+    assert newAllocationSize >= 1;
+
+    if (newAllocationSize > MAX_ALLOCATION_SIZE || newAllocationSize <= 
offsetBuffer.capacity()) {
+      throw new OversizedAllocationException("Unable to expand the buffer");
+    }
+
+    final ArrowBuf newBuf = allocator.buffer(newAllocationSize);
+    newBuf.setBytes(0, offsetBuffer, 0, currentBufferCapacity);
+    newBuf.setZero(currentBufferCapacity, newBuf.capacity() - 
currentBufferCapacity);
+    offsetBuffer.getReferenceManager().release(1);
+    offsetBuffer = newBuf;
+    offsetAllocationSizeInBytes = newAllocationSize;
+  }
+
+  private void reallocSizeBuffer() {
+    final long currentBufferCapacity = sizeBuffer.capacity();
+    long newAllocationSize = currentBufferCapacity * 2;
+    if (newAllocationSize == 0) {
+      if (sizeAllocationSizeInBytes > 0) {
+        newAllocationSize = sizeAllocationSizeInBytes;
+      } else {
+        newAllocationSize = INITIAL_VALUE_ALLOCATION * SIZE_WIDTH * 2;
+      }
+    }
+
+    newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize);
+    newAllocationSize = Math.min(newAllocationSize, (long) SIZE_WIDTH * 
Integer.MAX_VALUE);
+    assert newAllocationSize >= 1;
+
+    if (newAllocationSize > MAX_ALLOCATION_SIZE || newAllocationSize <= 
sizeBuffer.capacity()) {
+      throw new OversizedAllocationException("Unable to expand the buffer");
+    }
+
+    final ArrowBuf newBuf = allocator.buffer(newAllocationSize);
+    newBuf.setBytes(0, sizeBuffer, 0, currentBufferCapacity);
+    newBuf.setZero(currentBufferCapacity, newBuf.capacity() - 
currentBufferCapacity);
+    sizeBuffer.getReferenceManager().release(1);
+    sizeBuffer = newBuf;
+    sizeAllocationSizeInBytes = newAllocationSize;
+  }
+
+  @Override
+  public FieldVector getDataVector() {
+    return vector;
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    offsetAllocationSizeInBytes = (numRecords) * OFFSET_WIDTH;
+    sizeAllocationSizeInBytes = (numRecords) * SIZE_WIDTH;
+    if (vector instanceof BaseFixedWidthVector || vector instanceof 
BaseVariableWidthVector) {
+      vector.setInitialCapacity(numRecords * 
RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+    } else {
+      vector.setInitialCapacity(numRecords);
+    }
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords, double density) {
+    if ((numRecords * density) >= Integer.MAX_VALUE) {
+      throw new OversizedAllocationException("Requested amount of memory is 
more than max allowed");
+    }
+
+    offsetAllocationSizeInBytes = numRecords * OFFSET_WIDTH;
+    sizeAllocationSizeInBytes = numRecords * SIZE_WIDTH;
+
+    int innerValueCapacity = Math.max((int) (numRecords * density), 1);
+
+    if (vector instanceof DensityAwareVector) {
+      ((DensityAwareVector) vector).setInitialCapacity(innerValueCapacity, 
density);
+    } else {
+      vector.setInitialCapacity(innerValueCapacity);
+    }
+  }
+
+  /**
+   * Specialized version of setInitialTotalCapacity() for ListViewVector.
+   * This is used by some callers when they want to explicitly control and be
+   * conservative about memory allocated for inner data vector.
+   * This is very useful when we are working with memory constraints for a 
query
+   * and have a fixed amount of memory reserved for the record batch.
+   * In such cases, we are likely to face OOM or related problems when
+   * we reserve memory for a record batch with value count x and
+   * do setInitialCapacity(x) such that each vector allocates only
+   * what is necessary and not the default amount, but the multiplier
+   * forces the memory requirement to go beyond what was needed.
+   *
+   * @param numRecords value count
+   * @param totalNumberOfElements the total number of elements to allow
+   *                              for in this vector across all records.
+   */
+  public void setInitialTotalCapacity(int numRecords, int 
totalNumberOfElements) {
+    offsetAllocationSizeInBytes = numRecords * OFFSET_WIDTH;
+    sizeAllocationSizeInBytes = numRecords * SIZE_WIDTH;
+    vector.setInitialCapacity(totalNumberOfElements);
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return 0;
+  }
+
+  protected int getOffsetBufferValueCapacity() {
+    return capAtMaxInt(offsetBuffer.capacity() / OFFSET_WIDTH);
+  }
+
+  protected int getSizeBufferValueCapacity() {
+    return capAtMaxInt(sizeBuffer.capacity() / SIZE_WIDTH);
+  }
+
+  @Override
+  public int getBufferSize() {
+    if (valueCount == 0) {
+      return 0;
+    }
+    return (valueCount * OFFSET_WIDTH) + (valueCount * SIZE_WIDTH) + 
vector.getBufferSize();
+  }
+
+  @Override
+  public int getBufferSizeFor(int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
+
+    int innerVectorValueCount = 0;
+
+    for (int i = 0; i < valueCount; i++) {
+      innerVectorValueCount += sizeBuffer.getInt(i * SIZE_WIDTH);
+    }
+
+    return (valueCount * OFFSET_WIDTH) + (valueCount * SIZE_WIDTH) +
+        vector.getBufferSizeFor(innerVectorValueCount);
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return Collections.<ValueVector>singleton(getDataVector()).iterator();
+  }
+
+  @Override
+  public void clear() {
+    offsetBuffer = releaseBuffer(offsetBuffer);
+    sizeBuffer = releaseBuffer(sizeBuffer);
+    vector.clear();
+    valueCount = 0;
+    super.clear();
+  }
+
+  @Override
+  public void reset() {
+    offsetBuffer.setZero(0, offsetBuffer.capacity());
+    sizeBuffer.setZero(0, sizeBuffer.capacity());
+    vector.reset();
+    valueCount = 0;
+  }
+
+  @Override
+  public ArrowBuf[] getBuffers(boolean clear) {
+    return new ArrowBuf[0];
+  }
+
+  @Override
+  public int getValueCount() {
+    return valueCount;
+  }
+
+  @Override
+  public boolean isNull(int index) {
+    return false;
+  }
+
+  @Override
+  public void setValueCount(int valueCount) {
+    this.valueCount = valueCount;
+    while (valueCount > getOffsetBufferValueCapacity()) {
+      reallocateBuffers();
+    }
+    final int childValueCount = valueCount == 0 ? 0 : getLengthOfChildVector();
+    vector.setValueCount(childValueCount);
+  }
+
+  private int getLengthOfChildVector() {
+    int length = 0;
+    for (int i = 0; i <= valueCount; i++) {
+      length += sizeBuffer.getInt(i * SIZE_WIDTH);

Review Comment:
   It depends on how this is used. Are you using this as the physical length, 
or do you only care about the part that the list vector actually references?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to