This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 807d9f77c [GLUTEN-6645][VL] Remove VeloxWriteQueue which may introduce 
deadlock (#6646)
807d9f77c is described below

commit 807d9f77cb412f3ab90ea9e15408019c52941399
Author: WangGuangxin <[email protected]>
AuthorDate: Fri Aug 2 11:26:48 2024 +0800

    [GLUTEN-6645][VL] Remove VeloxWriteQueue which may introduce deadlock 
(#6646)
---
 .../datasources/VeloxColumnarBatchIterator.scala   | 76 ------------------
 .../execution/datasources/VeloxWriteQueue.scala    | 90 ----------------------
 .../velox/VeloxFormatWriterInjects.scala           | 31 ++++----
 cpp/core/jni/JniWrapper.cc                         | 14 +---
 .../gluten/datasource/DatasourceJniWrapper.java    |  3 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     |  7 --
 6 files changed, 18 insertions(+), 203 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
deleted file mode 100644
index 0e6aceddf..000000000
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.exception.GlutenException
-
-import org.apache.spark.sql.execution.datasources.VeloxWriteQueue.EOS_BATCH
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.types.pojo.Schema
-
-import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-
-class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator, 
queueSize: Int)
-  extends Iterator[ColumnarBatch]
-  with AutoCloseable {
-  private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize)
-  private var currentBatch: Option[ColumnarBatch] = None
-
-  def enqueue(batch: ColumnarBatch): Unit = {
-    // Throw exception if the queue is full.
-    if (!writeQueue.offer(batch, 30L, TimeUnit.MINUTES)) {
-      throw new GlutenException("VeloxParquetWriter: Timeout waiting for 
adding data")
-    }
-  }
-
-  override def hasNext: Boolean = {
-    val batch =
-      try {
-        writeQueue.poll(30L, TimeUnit.MINUTES)
-      } catch {
-        case _: InterruptedException =>
-          Thread.currentThread().interrupt()
-          EOS_BATCH
-      }
-    if (batch == null) {
-      throw new GlutenException("VeloxParquetWriter: Timeout waiting for data")
-    }
-    if (batch == EOS_BATCH) {
-      return false
-    }
-    currentBatch = Some(batch)
-    true
-  }
-
-  override def next(): ColumnarBatch = {
-    try {
-      currentBatch match {
-        case Some(b) => b
-        case _ =>
-          throw new IllegalStateException("VeloxParquetWriter: Fatal: Call 
hasNext() first!")
-      }
-    } finally {
-      currentBatch = None
-    }
-  }
-
-  override def close(): Unit = {
-    allocator.close()
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
deleted file mode 100644
index 6e3e64796..000000000
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.datasource.DatasourceJniWrapper
-import org.apache.gluten.utils.iterator.Iterators
-import org.apache.gluten.vectorized.ColumnarBatchInIterator
-
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.execution.datasources.VeloxWriteQueue.EOS_BATCH
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.types.pojo.Schema
-
-import java.util.UUID
-import java.util.concurrent.atomic.AtomicReference
-import java.util.regex.Pattern
-
-import scala.collection.JavaConverters._
-
-// TODO: This probably can be removed: Velox's Parquet writer already supports 
push-based write.
-class VeloxWriteQueue(
-    tc: TaskContext,
-    dsHandle: Long,
-    schema: Schema,
-    allocator: BufferAllocator,
-    datasourceJniWrapper: DatasourceJniWrapper,
-    outputFileURI: String,
-    queueSize: Int)
-  extends AutoCloseable {
-  private val scanner = new VeloxColumnarBatchIterator(schema, allocator, 
queueSize)
-  private val writeException = new AtomicReference[Throwable]
-
-  private val writeThread = new Thread(
-    () => {
-      TaskContext.setTaskContext(tc)
-      try {
-        datasourceJniWrapper.write(
-          dsHandle,
-          new ColumnarBatchInIterator(
-            Iterators.wrap(scanner).recyclePayload(_.close()).create().asJava))
-      } catch {
-        case e: Exception =>
-          writeException.set(e)
-      }
-    },
-    "VeloxWriteQueue - " + UUID.randomUUID().toString
-  )
-
-  writeThread.start()
-
-  private def checkWriteException(): Unit = {
-    // check if VeloxWriteQueue thread was failed
-    val exception = writeException.get()
-    if (exception != null) {
-      throw exception
-    }
-  }
-
-  def enqueue(batch: ColumnarBatch): Unit = {
-    scanner.enqueue(batch)
-    checkWriteException()
-  }
-
-  override def close(): Unit = {
-    scanner.enqueue(EOS_BATCH)
-    writeThread.join()
-    checkWriteException()
-  }
-}
-
-object VeloxWriteQueue {
-  val EOS_BATCH = new ColumnarBatch(null)
-  val TAILING_FILENAME_REGEX = Pattern.compile("^(.*)/([^/]+)$")
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 6901bfffd..7da4da5f0 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.velox
 
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.{ColumnarBatches, 
ColumnarBatchJniWrapper}
 import org.apache.gluten.datasource.DatasourceJniWrapper
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.exec.Runtimes
@@ -31,7 +30,6 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkArrowUtil
-import org.apache.spark.util.TaskResources
 
 import com.google.common.base.Preconditions
 import org.apache.arrow.c.ArrowSchema
@@ -74,29 +72,26 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
       cSchema.close()
     }
 
-    // FIXME: remove this once we support push-based write.
-    val queueSize = 
context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)
-
-    val writeQueue =
-      new VeloxWriteQueue(
-        TaskResources.getLocalTaskContext(),
-        dsHandle,
-        arrowSchema,
-        allocator,
-        datasourceJniWrapper,
-        filePath,
-        queueSize)
-
     new OutputWriter {
       override def write(row: InternalRow): Unit = {
         val batch = row.asInstanceOf[FakeRow].batch
         Preconditions.checkState(ColumnarBatches.isLightBatch(batch))
         ColumnarBatches.retain(batch)
-        writeQueue.enqueue(batch)
+        val batchHandle = {
+          if (batch.numCols == 0) {
+            // the operation will find a zero column batch from a task-local 
pool
+            
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
+          } else {
+            val offloaded =
+              
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
+            ColumnarBatches.getNativeHandle(offloaded)
+          }
+        }
+        datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
+        batch.close()
       }
 
       override def close(): Unit = {
-        writeQueue.close()
         datasourceJniWrapper.close(dsHandle)
       }
 
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f39f9c923..4fa45d9cb 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1093,22 +1093,16 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_cl
   JNI_METHOD_END()
 }
 
-JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_write( // NOLINT
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_writeBatch( // NOLINT
     JNIEnv* env,
     jobject wrapper,
     jlong dsHandle,
-    jobject jIter) {
+    jlong batchHandle) {
   JNI_METHOD_START
   auto ctx = gluten::getRuntime(env, wrapper);
   auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
-  auto iter = makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
-  while (true) {
-    auto batch = iter->next();
-    if (!batch) {
-      break;
-    }
-    datasource->write(batch);
-  }
+  auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+  datasource->write(batch);
   JNI_METHOD_END()
 }
 
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
 
b/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
index 5a34196c4..11ed3fb7d 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
@@ -19,7 +19,6 @@ package org.apache.gluten.datasource;
 import org.apache.gluten.exec.Runtime;
 import org.apache.gluten.exec.RuntimeAware;
 import org.apache.gluten.init.JniUtils;
-import org.apache.gluten.vectorized.ColumnarBatchInIterator;
 
 import org.apache.spark.sql.execution.datasources.BlockStripes;
 
@@ -53,7 +52,7 @@ public class DatasourceJniWrapper implements RuntimeAware {
 
   public native void close(long dsHandle);
 
-  public native void write(long dsHandle, ColumnarBatchInIterator iterator);
+  public native void writeBatch(long dsHandle, long batchHandle);
 
   public native BlockStripes splitBlockByPartitionAndBucket(
       long blockAddress, int[] partitionColIndice, boolean hasBucket);
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index c05069c38..820978112 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -1565,13 +1565,6 @@ object GlutenConfig {
       .booleanConf
       .createOptional
 
-  val VELOX_WRITER_QUEUE_SIZE =
-    buildConf("spark.gluten.sql.velox.writer.queue.size")
-      .internal()
-      .doc("This is config to specify the velox writer queue size")
-      .intConf
-      .createWithDefault(64)
-
   val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
     buildConf("spark.gluten.sql.native.hive.writer.enabled")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to