This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d5ccdb7e [VL] Make velox writer queue size configurable (#6341)
7d5ccdb7e is described below

commit 7d5ccdb7ee40726d8077995c58fdc23df2f0bc6a
Author: Kaifei Yi <[email protected]>
AuthorDate: Sat Jul 6 11:58:54 2024 +0800

    [VL] Make velox writer queue size configurable (#6341)
---
 .../sql/execution/datasources/VeloxColumnarBatchIterator.scala     | 4 ++--
 .../apache/spark/sql/execution/datasources/VeloxWriteQueue.scala   | 5 +++--
 .../sql/execution/datasources/velox/VeloxFormatWriterInjects.scala | 7 ++++++-
 shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala   | 7 +++++++
 4 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
index 5c06b3910..0e6aceddf 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
@@ -26,10 +26,10 @@ import org.apache.arrow.vector.types.pojo.Schema
 
 import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
 
-class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator)
+class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator, 
queueSize: Int)
   extends Iterator[ColumnarBatch]
   with AutoCloseable {
-  private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](64)
+  private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize)
   private var currentBatch: Option[ColumnarBatch] = None
 
   def enqueue(batch: ColumnarBatch): Unit = {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
index b2905e157..6e3e64796 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
@@ -40,9 +40,10 @@ class VeloxWriteQueue(
     schema: Schema,
     allocator: BufferAllocator,
     datasourceJniWrapper: DatasourceJniWrapper,
-    outputFileURI: String)
+    outputFileURI: String,
+    queueSize: Int)
   extends AutoCloseable {
-  private val scanner = new VeloxColumnarBatchIterator(schema, allocator)
+  private val scanner = new VeloxColumnarBatchIterator(schema, allocator, 
queueSize)
   private val writeException = new AtomicReference[Throwable]
 
   private val writeThread = new Thread(
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index ebbf959d0..6901bfffd 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.velox
 
+import org.apache.gluten.GlutenConfig
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.datasource.DatasourceJniWrapper
 import org.apache.gluten.exception.GlutenException
@@ -73,6 +74,9 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
       cSchema.close()
     }
 
+    // FIXME: remove this once we support push-based write.
+    val queueSize = 
context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)
+
     val writeQueue =
       new VeloxWriteQueue(
         TaskResources.getLocalTaskContext(),
@@ -80,7 +84,8 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
         arrowSchema,
         allocator,
         datasourceJniWrapper,
-        filePath)
+        filePath,
+        queueSize)
 
     new OutputWriter {
       override def write(row: InternalRow): Unit = {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ef4618fca..4cf81af5a 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1599,6 +1599,13 @@ object GlutenConfig {
       .booleanConf
       .createOptional
 
+  val VELOX_WRITER_QUEUE_SIZE =
+    buildConf("spark.gluten.sql.velox.writer.queue.size")
+      .internal()
+      .doc("This is config to specify the velox writer queue size")
+      .intConf
+      .createWithDefault(64)
+
   val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
     buildConf("spark.gluten.sql.native.hive.writer.enabled")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to