This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new db584b0e00 [VL] Refactor RowToVeloxColumnarExec to remove some
duplicate code (#9350)
db584b0e00 is described below
commit db584b0e0009b544bbf6955aab5029b292541aff
Author: wypb <[email protected]>
AuthorDate: Wed May 7 19:00:42 2025 +0800
[VL] Refactor RowToVeloxColumnarExec to remove some duplicate code (#9350)
---
.../gluten/execution/RowToVeloxColumnarExec.scala | 43 ++++++++--------------
1 file changed, 16 insertions(+), 27 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index ab67448c3d..adb4aeca2a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -156,9 +156,6 @@ object RowToVeloxColumnarExec {
}
override def next(): ColumnarBatch = {
- val firstRow = it.next()
- val start = System.currentTimeMillis()
- val row = convertToUnsafeRow(firstRow)
var arrowBuf: ArrowBuf = null
TaskResources.addRecycler("RowToColumnar_arrowBuf", 100) {
if (arrowBuf != null && arrowBuf.refCnt() != 0) {
@@ -168,35 +165,27 @@ object RowToVeloxColumnarExec {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0L
- val sizeInBytes = row.getSizeInBytes
- // allocate buffer based on 1st row, but if first row is very big,
this will cause OOM
- // maybe we should optimize to list ArrayBuf to native to avoid buf
close and allocate
- // 31760L origins from
BaseVariableWidthVector.lastValueAllocationSizeInBytes
- // experimental value
- val estimatedBufSize = Math.max(
- Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L *
columnBatchSize),
- sizeInBytes.toDouble * 10)
- arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
- Platform.copyMemory(
- row.getBaseObject,
- row.getBaseOffset,
- null,
- arrowBuf.memoryAddress() + offset,
- sizeInBytes)
- offset += sizeInBytes
- rowLength += sizeInBytes.toLong
- rowCount += 1
-
- convertTime += System.currentTimeMillis() - start
while (rowCount < columnBatchSize && !finished) {
- val iterHasNext = it.hasNext
- if (!iterHasNext) {
+ if (!it.hasNext) {
finished = true
} else {
val row = it.next()
- val start2 = System.currentTimeMillis()
+ val start = System.currentTimeMillis()
val unsafeRow = convertToUnsafeRow(row)
val sizeInBytes = unsafeRow.getSizeInBytes
+
+ // allocate buffer based on first row
+ if (rowCount == 0) {
+ // allocate buffer based on 1st row, but if first row is very
big, this will cause OOM
+ // maybe we should optimize to list ArrayBuf to native to avoid
buf close and allocate
+ // 31760L origins from
BaseVariableWidthVector.lastValueAllocationSizeInBytes
+ // experimental value
+ val estimatedBufSize = Math.max(
+ Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L
* columnBatchSize),
+ sizeInBytes.toDouble * 10)
+ arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
+ }
+
if ((offset + sizeInBytes) > arrowBuf.capacity()) {
val tmpBuf = arrowAllocator.buffer((offset + sizeInBytes) * 2)
tmpBuf.setBytes(0, arrowBuf, 0, offset)
@@ -212,7 +201,7 @@ object RowToVeloxColumnarExec {
offset += sizeInBytes
rowLength += sizeInBytes.toLong
rowCount += 1
- convertTime += System.currentTimeMillis() - start2
+ convertTime += System.currentTimeMillis() - start
}
}
numInputRows += rowCount
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]