This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new db584b0e00 [VL] Refactor RowToVeloxColumnarExec to remove some 
duplicate code (#9350)
db584b0e00 is described below

commit db584b0e0009b544bbf6955aab5029b292541aff
Author: wypb <[email protected]>
AuthorDate: Wed May 7 19:00:42 2025 +0800

    [VL] Refactor RowToVeloxColumnarExec to remove some duplicate code (#9350)
---
 .../gluten/execution/RowToVeloxColumnarExec.scala  | 43 ++++++++--------------
 1 file changed, 16 insertions(+), 27 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index ab67448c3d..adb4aeca2a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -156,9 +156,6 @@ object RowToVeloxColumnarExec {
       }
 
       override def next(): ColumnarBatch = {
-        val firstRow = it.next()
-        val start = System.currentTimeMillis()
-        val row = convertToUnsafeRow(firstRow)
         var arrowBuf: ArrowBuf = null
         TaskResources.addRecycler("RowToColumnar_arrowBuf", 100) {
           if (arrowBuf != null && arrowBuf.refCnt() != 0) {
@@ -168,35 +165,27 @@ object RowToVeloxColumnarExec {
         val rowLength = new ListBuffer[Long]()
         var rowCount = 0
         var offset = 0L
-        val sizeInBytes = row.getSizeInBytes
-        // allocate buffer based on 1st row, but if first row is very big, 
this will cause OOM
-        // maybe we should optimize to list ArrayBuf to native to avoid buf 
close and allocate
-        // 31760L origins from 
BaseVariableWidthVector.lastValueAllocationSizeInBytes
-        // experimental value
-        val estimatedBufSize = Math.max(
-          Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L * 
columnBatchSize),
-          sizeInBytes.toDouble * 10)
-        arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
-        Platform.copyMemory(
-          row.getBaseObject,
-          row.getBaseOffset,
-          null,
-          arrowBuf.memoryAddress() + offset,
-          sizeInBytes)
-        offset += sizeInBytes
-        rowLength += sizeInBytes.toLong
-        rowCount += 1
-
-        convertTime += System.currentTimeMillis() - start
         while (rowCount < columnBatchSize && !finished) {
-          val iterHasNext = it.hasNext
-          if (!iterHasNext) {
+          if (!it.hasNext) {
             finished = true
           } else {
             val row = it.next()
-            val start2 = System.currentTimeMillis()
+            val start = System.currentTimeMillis()
             val unsafeRow = convertToUnsafeRow(row)
             val sizeInBytes = unsafeRow.getSizeInBytes
+
+            // allocate buffer based on first row
+            if (rowCount == 0) {
+              // allocate buffer based on 1st row, but if first row is very 
big, this will cause OOM
+              // maybe we should optimize to list ArrayBuf to native to avoid 
buf close and allocate
+              // 31760L origins from 
BaseVariableWidthVector.lastValueAllocationSizeInBytes
+              // experimental value
+              val estimatedBufSize = Math.max(
+                Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L 
* columnBatchSize),
+                sizeInBytes.toDouble * 10)
+              arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
+            }
+
             if ((offset + sizeInBytes) > arrowBuf.capacity()) {
               val tmpBuf = arrowAllocator.buffer((offset + sizeInBytes) * 2)
               tmpBuf.setBytes(0, arrowBuf, 0, offset)
@@ -212,7 +201,7 @@ object RowToVeloxColumnarExec {
             offset += sizeInBytes
             rowLength += sizeInBytes.toLong
             rowCount += 1
-            convertTime += System.currentTimeMillis() - start2
+            convertTime += System.currentTimeMillis() - start
           }
         }
         numInputRows += rowCount


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to