This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a722af30d [VL] Fix RowToColumn metric convert time (#6106)
a722af30d is described below
commit a722af30dab3c202033ce4f502d016de0a4edcda
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Jun 18 09:05:43 2024 +0800
[VL] Fix RowToColumn metric convert time (#6106)
---
.../gluten/execution/RowToVeloxColumnarExec.scala | 41 +++++++++++-----------
1 file changed, 20 insertions(+), 21 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index be1bc64e2..5c9c5889b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -145,10 +145,20 @@ object RowToVeloxColumnarExec {
}
}
- def nativeConvert(row: UnsafeRow): ColumnarBatch = {
+ def convertToUnsafeRow(row: InternalRow): UnsafeRow = {
+ row match {
+ case unsafeRow: UnsafeRow => unsafeRow
+ case _ =>
+ converter.apply(row)
+ }
+ }
+
+ override def next(): ColumnarBatch = {
+ val firstRow = it.next()
+ val start = System.currentTimeMillis()
+ val row = convertToUnsafeRow(firstRow)
var arrowBuf: ArrowBuf = null
TaskResources.addRecycler("RowToColumnar_arrowBuf", 100) {
- // Remind, remove isOpen here
if (arrowBuf != null && arrowBuf.refCnt() != 0) {
arrowBuf.close()
}
@@ -175,12 +185,14 @@ object RowToVeloxColumnarExec {
rowLength += sizeInBytes.toLong
rowCount += 1
+ convertTime += System.currentTimeMillis() - start
while (rowCount < columnBatchSize && !finished) {
val iterHasNext = it.hasNext
if (!iterHasNext) {
finished = true
} else {
val row = it.next()
+ val start2 = System.currentTimeMillis()
val unsafeRow = convertToUnsafeRow(row)
val sizeInBytes = unsafeRow.getSizeInBytes
if ((offset + sizeInBytes) > arrowBuf.capacity()) {
@@ -198,36 +210,23 @@ object RowToVeloxColumnarExec {
offset += sizeInBytes
rowLength += sizeInBytes.toLong
rowCount += 1
+ convertTime += System.currentTimeMillis() - start2
}
}
numInputRows += rowCount
+ numOutputBatches += 1
+ val startNative = System.currentTimeMillis()
try {
val handle = jniWrapper
.nativeConvertRowToColumnar(r2cHandle, rowLength.toArray,
arrowBuf.memoryAddress())
- ColumnarBatches.create(Runtimes.contextInstance(), handle)
+ val cb = ColumnarBatches.create(Runtimes.contextInstance(), handle)
+ convertTime += System.currentTimeMillis() - startNative
+ cb
} finally {
arrowBuf.close()
arrowBuf = null
}
}
-
- def convertToUnsafeRow(row: InternalRow): UnsafeRow = {
- row match {
- case unsafeRow: UnsafeRow => unsafeRow
- case _ =>
- converter.apply(row)
- }
- }
-
- override def next(): ColumnarBatch = {
- val firstRow = it.next()
- val start = System.currentTimeMillis()
- val unsafeRow = convertToUnsafeRow(firstRow)
- val cb = nativeConvert(unsafeRow)
- numOutputBatches += 1
- convertTime += System.currentTimeMillis() - start
- cb
- }
}
Iterators
.wrap(res)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]