This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 807d9f77c [GLUTEN-6645][VL] Remove VeloxWriteQueue which may introduce
deadlock (#6646)
807d9f77c is described below
commit 807d9f77cb412f3ab90ea9e15408019c52941399
Author: WangGuangxin <[email protected]>
AuthorDate: Fri Aug 2 11:26:48 2024 +0800
[GLUTEN-6645][VL] Remove VeloxWriteQueue which may introduce deadlock
(#6646)
---
.../datasources/VeloxColumnarBatchIterator.scala | 76 ------------------
.../execution/datasources/VeloxWriteQueue.scala | 90 ----------------------
.../velox/VeloxFormatWriterInjects.scala | 31 ++++----
cpp/core/jni/JniWrapper.cc | 14 +---
.../gluten/datasource/DatasourceJniWrapper.java | 3 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 7 --
6 files changed, 18 insertions(+), 203 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
deleted file mode 100644
index 0e6aceddf..000000000
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.exception.GlutenException
-
-import org.apache.spark.sql.execution.datasources.VeloxWriteQueue.EOS_BATCH
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.types.pojo.Schema
-
-import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-
-class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator,
queueSize: Int)
- extends Iterator[ColumnarBatch]
- with AutoCloseable {
- private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize)
- private var currentBatch: Option[ColumnarBatch] = None
-
- def enqueue(batch: ColumnarBatch): Unit = {
- // Throw exception if the queue is full.
- if (!writeQueue.offer(batch, 30L, TimeUnit.MINUTES)) {
- throw new GlutenException("VeloxParquetWriter: Timeout waiting for
adding data")
- }
- }
-
- override def hasNext: Boolean = {
- val batch =
- try {
- writeQueue.poll(30L, TimeUnit.MINUTES)
- } catch {
- case _: InterruptedException =>
- Thread.currentThread().interrupt()
- EOS_BATCH
- }
- if (batch == null) {
- throw new GlutenException("VeloxParquetWriter: Timeout waiting for data")
- }
- if (batch == EOS_BATCH) {
- return false
- }
- currentBatch = Some(batch)
- true
- }
-
- override def next(): ColumnarBatch = {
- try {
- currentBatch match {
- case Some(b) => b
- case _ =>
- throw new IllegalStateException("VeloxParquetWriter: Fatal: Call
hasNext() first!")
- }
- } finally {
- currentBatch = None
- }
- }
-
- override def close(): Unit = {
- allocator.close()
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
deleted file mode 100644
index 6e3e64796..000000000
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.datasource.DatasourceJniWrapper
-import org.apache.gluten.utils.iterator.Iterators
-import org.apache.gluten.vectorized.ColumnarBatchInIterator
-
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.execution.datasources.VeloxWriteQueue.EOS_BATCH
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.types.pojo.Schema
-
-import java.util.UUID
-import java.util.concurrent.atomic.AtomicReference
-import java.util.regex.Pattern
-
-import scala.collection.JavaConverters._
-
-// TODO: This probably can be removed: Velox's Parquet writer already supports
push-based write.
-class VeloxWriteQueue(
- tc: TaskContext,
- dsHandle: Long,
- schema: Schema,
- allocator: BufferAllocator,
- datasourceJniWrapper: DatasourceJniWrapper,
- outputFileURI: String,
- queueSize: Int)
- extends AutoCloseable {
- private val scanner = new VeloxColumnarBatchIterator(schema, allocator,
queueSize)
- private val writeException = new AtomicReference[Throwable]
-
- private val writeThread = new Thread(
- () => {
- TaskContext.setTaskContext(tc)
- try {
- datasourceJniWrapper.write(
- dsHandle,
- new ColumnarBatchInIterator(
- Iterators.wrap(scanner).recyclePayload(_.close()).create().asJava))
- } catch {
- case e: Exception =>
- writeException.set(e)
- }
- },
- "VeloxWriteQueue - " + UUID.randomUUID().toString
- )
-
- writeThread.start()
-
- private def checkWriteException(): Unit = {
- // check if VeloxWriteQueue thread was failed
- val exception = writeException.get()
- if (exception != null) {
- throw exception
- }
- }
-
- def enqueue(batch: ColumnarBatch): Unit = {
- scanner.enqueue(batch)
- checkWriteException()
- }
-
- override def close(): Unit = {
- scanner.enqueue(EOS_BATCH)
- writeThread.join()
- checkWriteException()
- }
-}
-
-object VeloxWriteQueue {
- val EOS_BATCH = new ColumnarBatch(null)
- val TAILING_FILENAME_REGEX = Pattern.compile("^(.*)/([^/]+)$")
-}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 6901bfffd..7da4da5f0 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -16,8 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.{ColumnarBatches,
ColumnarBatchJniWrapper}
import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exec.Runtimes
@@ -31,7 +30,6 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
-import org.apache.spark.util.TaskResources
import com.google.common.base.Preconditions
import org.apache.arrow.c.ArrowSchema
@@ -74,29 +72,26 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
cSchema.close()
}
- // FIXME: remove this once we support push-based write.
- val queueSize =
context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)
-
- val writeQueue =
- new VeloxWriteQueue(
- TaskResources.getLocalTaskContext(),
- dsHandle,
- arrowSchema,
- allocator,
- datasourceJniWrapper,
- filePath,
- queueSize)
-
new OutputWriter {
override def write(row: InternalRow): Unit = {
val batch = row.asInstanceOf[FakeRow].batch
Preconditions.checkState(ColumnarBatches.isLightBatch(batch))
ColumnarBatches.retain(batch)
- writeQueue.enqueue(batch)
+ val batchHandle = {
+ if (batch.numCols == 0) {
+ // the operation will find a zero column batch from a task-local
pool
+
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
+ } else {
+ val offloaded =
+
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
+ ColumnarBatches.getNativeHandle(offloaded)
+ }
+ }
+ datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
+ batch.close()
}
override def close(): Unit = {
- writeQueue.close()
datasourceJniWrapper.close(dsHandle)
}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f39f9c923..4fa45d9cb 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1093,22 +1093,16 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_cl
JNI_METHOD_END()
}
-JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_write( // NOLINT
+JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_writeBatch( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong dsHandle,
- jobject jIter) {
+ jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
- auto iter = makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
- while (true) {
- auto batch = iter->next();
- if (!batch) {
- break;
- }
- datasource->write(batch);
- }
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+ datasource->write(batch);
JNI_METHOD_END()
}
diff --git
a/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
b/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
index 5a34196c4..11ed3fb7d 100644
---
a/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
+++
b/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
@@ -19,7 +19,6 @@ package org.apache.gluten.datasource;
import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.RuntimeAware;
import org.apache.gluten.init.JniUtils;
-import org.apache.gluten.vectorized.ColumnarBatchInIterator;
import org.apache.spark.sql.execution.datasources.BlockStripes;
@@ -53,7 +52,7 @@ public class DatasourceJniWrapper implements RuntimeAware {
public native void close(long dsHandle);
- public native void write(long dsHandle, ColumnarBatchInIterator iterator);
+ public native void writeBatch(long dsHandle, long batchHandle);
public native BlockStripes splitBlockByPartitionAndBucket(
long blockAddress, int[] partitionColIndice, boolean hasBucket);
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index c05069c38..820978112 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1565,13 +1565,6 @@ object GlutenConfig {
.booleanConf
.createOptional
- val VELOX_WRITER_QUEUE_SIZE =
- buildConf("spark.gluten.sql.velox.writer.queue.size")
- .internal()
- .doc("This is config to specify the velox writer queue size")
- .intConf
- .createWithDefault(64)
-
val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
buildConf("spark.gluten.sql.native.hive.writer.enabled")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]