This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7d5ccdb7e [VL] Make velox writer queue size configurable (#6341)
7d5ccdb7e is described below
commit 7d5ccdb7ee40726d8077995c58fdc23df2f0bc6a
Author: Kaifei Yi <[email protected]>
AuthorDate: Sat Jul 6 11:58:54 2024 +0800
[VL] Make velox writer queue size configurable (#6341)
---
.../sql/execution/datasources/VeloxColumnarBatchIterator.scala | 4 ++--
.../apache/spark/sql/execution/datasources/VeloxWriteQueue.scala | 5 +++--
.../sql/execution/datasources/velox/VeloxFormatWriterInjects.scala | 7 ++++++-
shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala | 7 +++++++
4 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
index 5c06b3910..0e6aceddf 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
@@ -26,10 +26,10 @@ import org.apache.arrow.vector.types.pojo.Schema
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator)
+class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator,
queueSize: Int)
extends Iterator[ColumnarBatch]
with AutoCloseable {
- private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](64)
+ private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize)
private var currentBatch: Option[ColumnarBatch] = None
def enqueue(batch: ColumnarBatch): Unit = {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
index b2905e157..6e3e64796 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
@@ -40,9 +40,10 @@ class VeloxWriteQueue(
schema: Schema,
allocator: BufferAllocator,
datasourceJniWrapper: DatasourceJniWrapper,
- outputFileURI: String)
+ outputFileURI: String,
+ queueSize: Int)
extends AutoCloseable {
- private val scanner = new VeloxColumnarBatchIterator(schema, allocator)
+ private val scanner = new VeloxColumnarBatchIterator(schema, allocator,
queueSize)
private val writeException = new AtomicReference[Throwable]
private val writeThread = new Thread(
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index ebbf959d0..6901bfffd 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox
+import org.apache.gluten.GlutenConfig
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.exception.GlutenException
@@ -73,6 +74,9 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
cSchema.close()
}
+ // FIXME: remove this once we support push-based write.
+ val queueSize =
context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)
+
val writeQueue =
new VeloxWriteQueue(
TaskResources.getLocalTaskContext(),
@@ -80,7 +84,8 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
arrowSchema,
allocator,
datasourceJniWrapper,
- filePath)
+ filePath,
+ queueSize)
new OutputWriter {
override def write(row: InternalRow): Unit = {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ef4618fca..4cf81af5a 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1599,6 +1599,13 @@ object GlutenConfig {
.booleanConf
.createOptional
+ val VELOX_WRITER_QUEUE_SIZE =
+ buildConf("spark.gluten.sql.velox.writer.queue.size")
+ .internal()
+ .doc("This is config to specify the velox writer queue size")
+ .intConf
+ .createWithDefault(64)
+
val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
buildConf("spark.gluten.sql.native.hive.writer.enabled")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]