Copilot commented on code in PR #12222:
URL: https://github.com/apache/gluten/pull/12222#discussion_r3356426292
##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -246,9 +238,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
override def closeFile(filePath: String): Unit = {
+ def signalEoS(): Unit = {
+ while (true) {
+ if (currentJsonFuture.isDone) {
+ currentJsonFuture.get()
+ // The future should be an error since we haven't signaled EoS yet.
+ throw new IllegalStateException("Unreachable code.")
+ }
+ val queued = inputBatchQueue.offer(None, 500,
java.util.concurrent.TimeUnit.MILLISECONDS)
+ if (queued) {
+ // The future should be done after we signal EoS.
+ return
+ }
+ }
+ }
Review Comment:
closeFile()'s signalEoS() throws IllegalStateException if currentJsonFuture
completes normally before enqueuing EoS. That can turn an otherwise successful
stats computation into a hard failure (e.g., if Velox finishes early or the
barrier yields output without needing EoS). It also loops indefinitely if the
consumer never takes from the SynchronousQueue, which can hang task completion.
##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -259,20 +265,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
row match {
case _: PlaceholderRow =>
case t: TerminalRow =>
- val valueBatch = t.batch()
- val numRows = valueBatch.numRows()
- val dummyKeyVec = ArrowWritableColumnVector
- .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name,
IntegerType))
- .head
- (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
- val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
- ColumnarBatches.offload(
- ArrowBufferAllocators.contextInstance(),
- new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
- val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch,
valueBatch)
- dummyKeyBatch.close()
- valueBatch.close()
- inputBatchQueue.put(Some(compositeBatch))
+ val batch = t.batch()
+ // Counts up the reference count for the batch since it
+ // will be consumed by Velox aggregation task asynchronously.
+ ColumnarBatches.retain(batch)
+ inputBatchQueue.put(Some(batch))
Review Comment:
newRow() retains the batch before putting it into the SynchronousQueue, but
if put() throws (e.g., task cancellation/interrupt), the retained reference is
leaked because it is never released/closed on the exceptional path.
##########
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -250,9 +242,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
override def closeFile(filePath: String): Unit = {
+ def signalEoS(): Unit = {
+ while (true) {
+ if (currentJsonFuture.isDone) {
+ currentJsonFuture.get()
+ // The future should be an error since we haven't signaled EoS yet.
+ throw new IllegalStateException("Unreachable code.")
+ }
+ val queued = inputBatchQueue.offer(None, 500,
java.util.concurrent.TimeUnit.MILLISECONDS)
+ if (queued) {
+ // The future should be done after we signal EoS.
+ return
+ }
+ }
+ }
Review Comment:
closeFile()'s signalEoS() throws IllegalStateException if currentJsonFuture
completes normally before enqueuing EoS. That can turn an otherwise successful
stats computation into a hard failure (e.g., if Velox finishes early or the
barrier yields output without needing EoS). It also loops indefinitely if the
consumer never takes from the SynchronousQueue, which can hang task completion.
##########
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala:
##########
@@ -263,20 +269,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
row match {
case _: PlaceholderRow =>
case t: TerminalRow =>
- val valueBatch = t.batch()
- val numRows = valueBatch.numRows()
- val dummyKeyVec = ArrowWritableColumnVector
- .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name,
IntegerType))
- .head
- (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
- val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
- ColumnarBatches.offload(
- ArrowBufferAllocators.contextInstance(),
- new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
- val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch,
valueBatch)
- dummyKeyBatch.close()
- valueBatch.close()
- inputBatchQueue.put(Some(compositeBatch))
+ val batch = t.batch()
+ // Counts up the reference count for the batch since it
+ // will be consumed by Velox aggregation task asynchronously.
+ ColumnarBatches.retain(batch)
+ inputBatchQueue.put(Some(batch))
Review Comment:
newRow() retains the batch before putting it into the SynchronousQueue, but
if put() throws (e.g., task cancellation/interrupt), the retained reference is
leaked because it is never released/closed on the exceptional path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]