This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a722af30d [VL] Fix RowToColumn metric convert time (#6106)
a722af30d is described below

commit a722af30dab3c202033ce4f502d016de0a4edcda
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Jun 18 09:05:43 2024 +0800

    [VL] Fix RowToColumn metric convert time (#6106)
---
 .../gluten/execution/RowToVeloxColumnarExec.scala  | 41 +++++++++++-----------
 1 file changed, 20 insertions(+), 21 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index be1bc64e2..5c9c5889b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -145,10 +145,20 @@ object RowToVeloxColumnarExec {
         }
       }
 
-      def nativeConvert(row: UnsafeRow): ColumnarBatch = {
+      def convertToUnsafeRow(row: InternalRow): UnsafeRow = {
+        row match {
+          case unsafeRow: UnsafeRow => unsafeRow
+          case _ =>
+            converter.apply(row)
+        }
+      }
+
+      override def next(): ColumnarBatch = {
+        val firstRow = it.next()
+        val start = System.currentTimeMillis()
+        val row = convertToUnsafeRow(firstRow)
         var arrowBuf: ArrowBuf = null
         TaskResources.addRecycler("RowToColumnar_arrowBuf", 100) {
-          // Remind, remove isOpen here
           if (arrowBuf != null && arrowBuf.refCnt() != 0) {
             arrowBuf.close()
           }
@@ -175,12 +185,14 @@ object RowToVeloxColumnarExec {
         rowLength += sizeInBytes.toLong
         rowCount += 1
 
+        convertTime += System.currentTimeMillis() - start
         while (rowCount < columnBatchSize && !finished) {
           val iterHasNext = it.hasNext
           if (!iterHasNext) {
             finished = true
           } else {
             val row = it.next()
+            val start2 = System.currentTimeMillis()
             val unsafeRow = convertToUnsafeRow(row)
             val sizeInBytes = unsafeRow.getSizeInBytes
             if ((offset + sizeInBytes) > arrowBuf.capacity()) {
@@ -198,36 +210,23 @@ object RowToVeloxColumnarExec {
             offset += sizeInBytes
             rowLength += sizeInBytes.toLong
             rowCount += 1
+            convertTime += System.currentTimeMillis() - start2
           }
         }
         numInputRows += rowCount
+        numOutputBatches += 1
+        val startNative = System.currentTimeMillis()
         try {
           val handle = jniWrapper
             .nativeConvertRowToColumnar(r2cHandle, rowLength.toArray, 
arrowBuf.memoryAddress())
-          ColumnarBatches.create(Runtimes.contextInstance(), handle)
+          val cb = ColumnarBatches.create(Runtimes.contextInstance(), handle)
+          convertTime += System.currentTimeMillis() - startNative
+          cb
         } finally {
           arrowBuf.close()
           arrowBuf = null
         }
       }
-
-      def convertToUnsafeRow(row: InternalRow): UnsafeRow = {
-        row match {
-          case unsafeRow: UnsafeRow => unsafeRow
-          case _ =>
-            converter.apply(row)
-        }
-      }
-
-      override def next(): ColumnarBatch = {
-        val firstRow = it.next()
-        val start = System.currentTimeMillis()
-        val unsafeRow = convertToUnsafeRow(firstRow)
-        val cb = nativeConvert(unsafeRow)
-        numOutputBatches += 1
-        convertTime += System.currentTimeMillis() - start
-        cb
-      }
     }
     Iterators
       .wrap(res)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to