This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fe8d9299f [GLUTEN-9801] Only delete the files written by the failed 
task when calling the abortTask() method (#9844)
9fe8d9299f is described below

commit 9fe8d9299fa685f18ca7d03e8ee1dcfc14990992
Author: JiaKe <[email protected]>
AuthorDate: Wed Jul 23 09:37:28 2025 +0800

    [GLUTEN-9801] Only delete the files written by the failed task when calling 
the abortTask() method (#9844)
---
 .../backendsapi/velox/VeloxIteratorApi.scala       |  2 +-
 .../execution/SparkWriteFilesCommitProtocol.scala  | 26 +++++++++++++--
 .../execution/VeloxColumnarWriteFilesExec.scala    |  7 ++--
 .../sql/execution/VeloxParquetWriteSuite.scala     | 33 ++++++++++++------
 cpp/core/compute/Runtime.cc                        |  7 ++++
 cpp/core/compute/Runtime.h                         |  1 +
 cpp/core/jni/JniWrapper.cc                         |  8 ++++-
 cpp/velox/compute/VeloxPlanConverter.cc            |  3 +-
 cpp/velox/compute/VeloxPlanConverter.h             |  1 +
 cpp/velox/compute/VeloxRuntime.cc                  |  9 +++--
 cpp/velox/cudf/CudfPlanValidator.cc                |  3 +-
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 39 ++++++----------------
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |  8 ++++-
 .../substrait/SubstraitToVeloxPlanValidator.h      |  3 +-
 .../Substrait2VeloxValuesNodeConversionTest.cc     |  2 +-
 cpp/velox/tests/VeloxSubstraitRoundTripTest.cc     |  5 +--
 .../gluten/vectorized/NativePlanEvaluator.java     |  5 +--
 .../gluten/vectorized/PlanEvaluatorJniWrapper.java |  2 +-
 18 files changed, 107 insertions(+), 57 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index c8aa07728f..fa713549fd 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -217,7 +217,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   }
 
   override def injectWriteFilesTempPath(path: String, fileName: String): Unit 
= {
-    NativePlanEvaluator.injectWriteFilesTempPath(path)
+    NativePlanEvaluator.injectWriteFilesTempPath(path, fileName)
   }
 
   /** Generate Iterator[ColumnarBatch] for first stage. */
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 197b57f592..13a9b987f3 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
HadoopMapReduceCommitProtocol}
 import org.apache.spark.sql.execution.datasources.WriteJobDescription
 import org.apache.spark.util.Utils
 
@@ -28,6 +28,9 @@ import 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import java.lang.reflect.Field
+import java.util.UUID
+
+import scala.collection.mutable
 
 /**
  * A wrapper for [[HadoopMapReduceCommitProtocol]]. This class only affects 
the task side commit
@@ -49,6 +52,8 @@ class SparkWriteFilesCommitProtocol(
   private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
   private val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 
+  private var fileNames: mutable.Set[String] = null
+
   // Set up the attempt context required to use in the output committer.
   val taskAttemptContext: TaskAttemptContext = {
     // Set up the configuration object
@@ -70,10 +75,22 @@ class SparkWriteFilesCommitProtocol(
 
   def setupTask(): Unit = {
     committer.setupTask(taskAttemptContext)
+    fileNames = mutable.Set[String]()
   }
 
   def getJobId: String = jobId.toString
 
+  // Copied from `HadoopMapReduceCommitProtocol.getFilename`.
+  def getFilename(spec: FileNameSpec): String = {
+    // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
+    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+    // the file name is fine and won't overflow.
+    val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
+    val fileName = 
f"${spec.prefix}part-$split%05d-${UUID.randomUUID().toString()}${spec.suffix}"
+    fileNames += fileName
+    fileName
+  }
+
   def newTaskAttemptTempPath(): String = {
     assert(internalCommitter != null)
     val stagingDir: Path = internalCommitter match {
@@ -100,8 +117,11 @@ class SparkWriteFilesCommitProtocol(
   def abortTask(writePath: String): Unit = {
     committer.abortTask(taskAttemptContext)
 
-    val tmpPath = new Path(writePath)
-    tmpPath.getFileSystem(taskAttemptContext.getConfiguration).delete(tmpPath, 
true)
+    // Deletes the files written by current task.
+    for (fileName <- fileNames) {
+      val filePath = new Path(writePath, fileName)
+      
filePath.getFileSystem(taskAttemptContext.getConfiguration).delete(filePath, 
false)
+    }
   }
 
   // Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with 
multi-version
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 1641351e6b..84f6134aba 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
-import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
SparkHadoopWriterUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.RDD
 import org.apache.spark.shuffle.FetchFailedException
@@ -195,11 +195,14 @@ class VeloxColumnarWriteFilesRDD(
 
     commitProtocol.setupTask()
     val writePath = commitProtocol.newTaskAttemptTempPath()
+    val suffix = 
description.outputWriterFactory.getFileExtension(commitProtocol.taskAttemptContext)
+    val fileNameSpec = FileNameSpec("", suffix)
+    val fileName = commitProtocol.getFilename(fileNameSpec)
     logDebug(s"Velox staging write path: $writePath")
     var writeTaskResult: WriteTaskResult = null
     try {
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
"")
+        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
fileName)
 
         // Initialize the native plan
         val iter = firstParent[ColumnarBatch].iterator(split, context)
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 3b3129090b..81c9870a52 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -33,16 +33,29 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite {
   override protected val fileFormat: String = "parquet"
 
   // The parquet compression codec extensions
-  private val parquetCompressionCodecExtensions = Map(
-    "none" -> "",
-    "uncompressed" -> "",
-    "snappy" -> ".snappy",
-    "gzip" -> ".gz",
-    "lzo" -> ".lzo",
-    "lz4" -> ".lz4",
-    "brotli" -> ".br",
-    "zstd" -> ".zstd"
-  )
+  private val parquetCompressionCodecExtensions = if (isSparkVersionGE("3.5")) 
{
+    Map(
+      "none" -> "",
+      "uncompressed" -> "",
+      "snappy" -> ".snappy",
+      "gzip" -> ".gz",
+      "lzo" -> ".lzo",
+      "lz4" -> ".lz4hadoop", // Specific extension for version 3.5
+      "brotli" -> ".br",
+      "zstd" -> ".zstd"
+    )
+  } else {
+    Map(
+      "none" -> "",
+      "uncompressed" -> "",
+      "snappy" -> ".snappy",
+      "gzip" -> ".gz",
+      "lzo" -> ".lzo",
+      "lz4" -> ".lz4",
+      "brotli" -> ".br",
+      "zstd" -> ".zstd"
+    )
+  }
 
   private def getParquetFileExtension(codec: String): String = {
     s"${parquetCompressionCodecExtensions(codec)}.parquet"
diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc
index 30b5668000..f5b4ed0f33 100644
--- a/cpp/core/compute/Runtime.cc
+++ b/cpp/core/compute/Runtime.cc
@@ -58,4 +58,11 @@ std::optional<std::string>* 
Runtime::localWriteFilesTempPath() {
   return &path;
 }
 
+std::optional<std::string>* Runtime::localWriteFileName() {
+  // This is thread-local to conform to Java side ColumnarWriteFilesExec's 
design.
+  // FIXME: Pass the path through relevant member functions.
+  static thread_local std::optional<std::string> fileName;
+  return &fileName;
+}
+
 } // namespace gluten
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 00498a9e54..4eda64129a 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -69,6 +69,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
       const std::unordered_map<std::string, std::string>& sessionConf = {});
   static void release(Runtime*);
   static std::optional<std::string>* localWriteFilesTempPath();
+  static std::optional<std::string>* localWriteFileName();
 
   Runtime(
       const std::string& kind,
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 0419436ef1..5cca5dd2ac 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -380,12 +380,18 @@ JNIEXPORT jstring JNICALL 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap
 JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath(
 // NOLINT
     JNIEnv* env,
     jclass,
-    jbyteArray path) {
+    jbyteArray path,
+    jbyteArray fileName) {
   JNI_METHOD_START
   auto len = env->GetArrayLength(path);
   auto safeArray = getByteArrayElementsSafe(env, path);
   std::string pathStr(reinterpret_cast<char*>(safeArray.elems()), len);
   *Runtime::localWriteFilesTempPath() = pathStr;
+
+  len = env->GetArrayLength(fileName);
+  auto fileNameArray = getByteArrayElementsSafe(env, fileName);
+  std::string fileNameStr(reinterpret_cast<char*>(fileNameArray.elems()), len);
+  *Runtime::localWriteFileName() = fileNameStr;
   JNI_METHOD_END()
 }
 
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index a3ae60cd0b..5529ffb90a 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -33,9 +33,10 @@ VeloxPlanConverter::VeloxPlanConverter(
     velox::memory::MemoryPool* veloxPool,
     const facebook::velox::config::ConfigBase* veloxCfg,
     const std::optional<std::string> writeFilesTempPath,
+    const std::optional<std::string> writeFileName,
     bool validationMode)
     : validationMode_(validationMode),
-      substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath, 
validationMode) {
+      substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath, 
writeFileName, validationMode) {
   substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
 }
 
diff --git a/cpp/velox/compute/VeloxPlanConverter.h 
b/cpp/velox/compute/VeloxPlanConverter.h
index 2528a46914..7a14693cb7 100644
--- a/cpp/velox/compute/VeloxPlanConverter.h
+++ b/cpp/velox/compute/VeloxPlanConverter.h
@@ -34,6 +34,7 @@ class VeloxPlanConverter {
       facebook::velox::memory::MemoryPool* veloxPool,
       const facebook::velox::config::ConfigBase* veloxCfg,
       const std::optional<std::string> writeFilesTempPath = std::nullopt,
+      const std::optional<std::string> writeFileName = std::nullopt,
       bool validationMode = false);
 
   std::shared_ptr<const facebook::velox::core::PlanNode> toVeloxPlan(
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index a45151168f..c80306e64b 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -138,7 +138,8 @@ void VeloxRuntime::getInfoAndIds(
 std::string VeloxRuntime::planString(bool details, const 
std::unordered_map<std::string, std::string>& sessionConf) {
   std::vector<std::shared_ptr<ResultIterator>> inputs;
   auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
-  VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), 
veloxCfg_.get(), std::nullopt, true);
+  VeloxPlanConverter veloxPlanConverter(
+      inputs, veloxMemoryPool.get(), veloxCfg_.get(), std::nullopt, 
std::nullopt, true);
   auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_);
   return veloxPlan->toString(details, true);
 }
@@ -156,7 +157,11 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
   LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << 
printConfig(confMap_);
 
   VeloxPlanConverter veloxPlanConverter(
-      inputs, memoryManager()->getLeafMemoryPool().get(), veloxCfg_.get(), 
*localWriteFilesTempPath());
+      inputs,
+      memoryManager()->getLeafMemoryPool().get(),
+      veloxCfg_.get(),
+      *localWriteFilesTempPath(),
+      *localWriteFileName());
   veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, 
std::move(localFiles_));
   LOG_IF(INFO, debugModeEnabled_ && taskInfo_.has_value())
       << "############### Velox plan for task " << taskInfo_.value() << " 
###############" << std::endl
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc 
b/cpp/velox/cudf/CudfPlanValidator.cc
index 177613e8cc..f1a58577f6 100644
--- a/cpp/velox/cudf/CudfPlanValidator.cc
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -35,7 +35,8 @@ bool CudfPlanValidator::validate(const ::substrait::Plan& 
substraitPlan) {
   std::vector<std::shared_ptr<ResultIterator>> inputs;
   std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
-  VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(), 
veloxCfg.get(), std::nullopt, true);
+  VeloxPlanConverter veloxPlanConverter(
+      inputs, veloxMemoryPool.get(), veloxCfg.get(), std::nullopt, 
std::nullopt, true);
   auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles);
   std::unordered_set<velox::core::PlanNodeId> emptySet;
   velox::core::PlanFragment planFragment{planNode, 
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 72cc7d88e9..8db52ab6eb 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -17,8 +17,6 @@
 
 #include "SubstraitToVeloxPlan.h"
 
-#include "utils/StringUtil.h"
-
 #include "TypeUtils.h"
 #include "VariantToVectorConverter.h"
 #include "operators/plannodes/RowVectorStream.h"
@@ -506,32 +504,9 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
   }
 }
 
-std::string makeUuid() {
-  return generateUuid();
-}
-
-std::string compressionFileNameSuffix(common::CompressionKind kind) {
-  switch (static_cast<int32_t>(kind)) {
-    case common::CompressionKind_ZLIB:
-      return ".zlib";
-    case common::CompressionKind_SNAPPY:
-      return ".snappy";
-    case common::CompressionKind_LZO:
-      return ".lzo";
-    case common::CompressionKind_ZSTD:
-      return ".zstd";
-    case common::CompressionKind_LZ4:
-      return ".lz4";
-    case common::CompressionKind_GZIP:
-      return ".gz";
-    case common::CompressionKind_NONE:
-    default:
-      return "";
-  }
-}
-
 std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
     const std::string& targetDirectory,
+    const std::string& fileName,
     dwio::common::FileFormat fileFormat,
     common::CompressionKind compression,
     const bool& isBucketed,
@@ -540,7 +515,7 @@ std::shared_ptr<connector::hive::LocationHandle> 
makeLocationHandle(
         connector::hive::LocationHandle::TableType::kExisting) {
   std::string targetFileName = "";
   if (fileFormat == dwio::common::FileFormat::PARQUET && !isBucketed) {
-    targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(), 
compressionFileNameSuffix(compression), ".parquet");
+    targetFileName = fileName;
   }
   return std::make_shared<connector::hive::LocationHandle>(
       targetDirectory, writeDirectory.value_or(targetDirectory), tableType, 
targetFileName);
@@ -672,6 +647,14 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
     writePath = "";
   }
 
+  std::string fileName;
+  if (writeFileName_.has_value()) {
+    fileName = writeFileName_.value();
+  } else {
+    VELOX_CHECK(validationMode_, "WriteRel should have the write path before 
initializing the plan.");
+    fileName = "";
+  }
+
   GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced 
extension not found in WriteRel");
   const auto& ext = writeRel.named_table().advanced_extension();
   GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in 
WriteRel");
@@ -706,7 +689,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
               inputType->children(),
               partitionedKey,
               bucketProperty,
-              makeLocationHandle(writePath, fileFormat, compressionKind, 
bucketProperty != nullptr),
+              makeLocationHandle(writePath, fileName, fileFormat, 
compressionKind, bucketProperty != nullptr),
               writerOptions,
               fileFormat,
               compressionKind)),
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 3d90b6ebe3..1d816da612 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -66,8 +66,13 @@ class SubstraitToVeloxPlanConverter {
       memory::MemoryPool* pool,
       const facebook::velox::config::ConfigBase* veloxCfg,
       const std::optional<std::string> writeFilesTempPath = std::nullopt,
+      const std::optional<std::string> writeFileName = std::nullopt,
       bool validationMode = false)
-      : pool_(pool), veloxCfg_(veloxCfg), 
writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {
+      : pool_(pool),
+        veloxCfg_(veloxCfg),
+        writeFilesTempPath_(writeFilesTempPath),
+        writeFileName_(writeFileName),
+        validationMode_(validationMode) {
     VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
   }
 
@@ -284,6 +289,7 @@ class SubstraitToVeloxPlanConverter {
 
   /// The temporary path used to write files.
   std::optional<std::string> writeFilesTempPath_;
+  std::optional<std::string> writeFileName_;
 
   /// A flag used to specify validation.
   bool validationMode_ = false;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
index 370b9501df..122a6b7d4a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
@@ -32,7 +32,8 @@ class SubstraitToVeloxPlanValidator {
     std::unordered_map<std::string, std::string> configs{
         {velox::core::QueryConfig::kSparkPartitionId, "0"}, 
{velox::core::QueryConfig::kSessionTimezone, "GMT"}};
     veloxCfg_ = 
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
-    planConverter_ = std::make_unique<SubstraitToVeloxPlanConverter>(pool, 
veloxCfg_.get(), std::nullopt, true);
+    planConverter_ =
+        std::make_unique<SubstraitToVeloxPlanConverter>(pool, veloxCfg_.get(), 
std::nullopt, std::nullopt, true);
     queryCtx_ = velox::core::QueryCtx::create(nullptr, 
velox::core::QueryConfig(veloxCfg_->rawConfigs()));
     // An execution context used for function validation.
     execCtx_ = std::make_unique<velox::core::ExecCtx>(pool, queryCtx_.get());
diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc 
b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
index ef8dd8cd03..0a2b409526 100644
--- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
@@ -43,7 +43,7 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) {
   JsonToProtoConverter::readFromFile(planPath, substraitPlan);
   auto veloxCfg = 
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
   std::shared_ptr<SubstraitToVeloxPlanConverter> planConverter_ =
-      std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, true);
+      std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, std::nullopt, true);
   auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan);
 
   RowVectorPtr expectedData = makeRowVector(
diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc 
b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
index 60cafd1c6f..8eeb2818ce 100644
--- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
+++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
@@ -71,7 +71,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
     auto veloxCfg =
         
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
     std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
-        std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, true);
+        std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, std::nullopt, true);
 
     // Convert Substrait Plan to the same Velox Plan.
     auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
@@ -92,7 +92,8 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
       auto veloxCfg =
           
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
       std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
-          std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, true);
+          std::make_shared<SubstraitToVeloxPlanConverter>(
+              pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true);
       // Convert Substrait Plan to the same Velox Plan.
       auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
 
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 44d4107c56..b3889fb231 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -54,8 +54,9 @@ public class NativePlanEvaluator {
     return jniWrapper.nativeValidateWithFailureReason(subPlan);
   }
 
-  public static void injectWriteFilesTempPath(String path) {
-    
PlanEvaluatorJniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
+  public static void injectWriteFilesTempPath(String path, String fileName) {
+    PlanEvaluatorJniWrapper.injectWriteFilesTempPath(
+        path.getBytes(StandardCharsets.UTF_8), 
fileName.getBytes(StandardCharsets.UTF_8));
   }
 
   // Used by WholeStageTransform to create the native computing pipeline and
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index 39a543422d..502bfdbcaf 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -41,7 +41,7 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
     return runtime.getHandle();
   }
 
-  public static native void injectWriteFilesTempPath(byte[] path);
+  public static native void injectWriteFilesTempPath(byte[] path, byte[] 
fileName);
 
   /**
    * Validate the Substrait plan in native compute engine.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to