Copilot commented on code in PR #12222:
URL: https://github.com/apache/gluten/pull/12222#discussion_r3356426292


##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -246,9 +238,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
     }
 
     override def closeFile(filePath: String): Unit = {
+      def signalEoS(): Unit = {
+        while (true) {
+          if (currentJsonFuture.isDone) {
+            currentJsonFuture.get()
+            // The future should be an error since we haven't signaled EoS yet.
+            throw new IllegalStateException("Unreachable code.")
+          }
+          val queued = inputBatchQueue.offer(None, 500, 
java.util.concurrent.TimeUnit.MILLISECONDS)
+          if (queued) {
+            // The future should be done after we signal EoS.
+            return
+          }
+        }
+      }

Review Comment:
   closeFile()'s signalEoS() throws IllegalStateException if currentJsonFuture 
completes normally before enqueuing EoS. That can turn an otherwise successful 
stats computation into a hard failure (e.g., if Velox finishes early or the 
barrier yields output without needing EoS). It also loops indefinitely if the 
consumer never takes from the SynchronousQueue, which can hang task completion.



##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -259,20 +265,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
       row match {
         case _: PlaceholderRow =>
         case t: TerminalRow =>
-          val valueBatch = t.batch()
-          val numRows = valueBatch.numRows()
-          val dummyKeyVec = ArrowWritableColumnVector
-            .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, 
IntegerType))
-            .head
-          (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
-          val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
-            ColumnarBatches.offload(
-              ArrowBufferAllocators.contextInstance(),
-              new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
-          val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, 
valueBatch)
-          dummyKeyBatch.close()
-          valueBatch.close()
-          inputBatchQueue.put(Some(compositeBatch))
+          val batch = t.batch()
+          // Counts up the reference count for the batch since it
+          // will be consumed by Velox aggregation task asynchronously.
+          ColumnarBatches.retain(batch)
+          inputBatchQueue.put(Some(batch))

Review Comment:
   newRow() retains the batch before putting it into the SynchronousQueue, but 
if put() throws (e.g., task cancellation/interrupt), the retained reference is 
leaked because it is never released/closed on the exceptional path.



##########
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -250,9 +242,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
     }
 
     override def closeFile(filePath: String): Unit = {
+      def signalEoS(): Unit = {
+        while (true) {
+          if (currentJsonFuture.isDone) {
+            currentJsonFuture.get()
+            // The future should be an error since we haven't signaled EoS yet.
+            throw new IllegalStateException("Unreachable code.")
+          }
+          val queued = inputBatchQueue.offer(None, 500, 
java.util.concurrent.TimeUnit.MILLISECONDS)
+          if (queued) {
+            // The future should be done after we signal EoS.
+            return
+          }
+        }
+      }

Review Comment:
   closeFile()'s signalEoS() throws IllegalStateException if currentJsonFuture 
completes normally before enqueuing EoS. That can turn an otherwise successful 
stats computation into a hard failure (e.g., if Velox finishes early or the 
barrier yields output without needing EoS). It also loops indefinitely if the 
consumer never takes from the SynchronousQueue, which can hang task completion.



##########
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -263,20 +269,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
       row match {
         case _: PlaceholderRow =>
         case t: TerminalRow =>
-          val valueBatch = t.batch()
-          val numRows = valueBatch.numRows()
-          val dummyKeyVec = ArrowWritableColumnVector
-            .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, 
IntegerType))
-            .head
-          (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
-          val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
-            ColumnarBatches.offload(
-              ArrowBufferAllocators.contextInstance(),
-              new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
-          val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, 
valueBatch)
-          dummyKeyBatch.close()
-          valueBatch.close()
-          inputBatchQueue.put(Some(compositeBatch))
+          val batch = t.batch()
+          // Counts up the reference count for the batch since it
+          // will be consumed by Velox aggregation task asynchronously.
+          ColumnarBatches.retain(batch)
+          inputBatchQueue.put(Some(batch))

Review Comment:
   newRow() retains the batch before putting it into the SynchronousQueue, but 
if put() throws (e.g., task cancellation/interrupt), the retained reference is 
leaked because it is never released/closed on the exceptional path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to